REST API Integration¶
This guide focuses on implementing the REST API aspects of your exchange adapter. Proper REST API integration is crucial for fetching historical candle data and providing a fallback when WebSocket connectivity is unavailable.
Understanding Exchange REST API¶
Before implementing, thoroughly understand how the exchange's REST API works:
graph TD
A[Study API Documentation] --> B{Requires Authentication?}
B -->|Yes| C[Implement Auth Mechanism]
B -->|No| D[Identify Candles Endpoint]
C --> D
D --> E[Understand Parameter Format]
E --> F[Determine Response Format]
F --> G[Check Rate Limits]
G --> H[Implement REST Methods]
REST Methods Implementation¶
Your adapter needs to implement these REST-related methods:
get_rest_url()
: Returns the REST API URLget_rest_params()
: Creates parameters for the REST requestparse_rest_response()
: Parses the REST API response into CandleData objects
Let's look at each in detail:
1. Implementing get_rest_url()
¶
def get_rest_url(self) -> str:
"""Get REST API URL for candles.
Returns:
REST API URL
"""
# For most exchanges, this is a simple concatenation
return f"{REST_URL}{CANDLES_ENDPOINT}"
# Some exchanges might require path parameters in the URL
# Example: return f"{REST_URL}{CANDLES_ENDPOINT}".format(product_id=some_id)
2. Implementing get_rest_params()
¶
This method creates the parameter dictionary for REST API requests:
def get_rest_params(self,
trading_pair: str,
interval: str,
start_time: Optional[int] = None,
end_time: Optional[int] = None,
limit: Optional[int] = None) -> dict:
"""Get parameters for REST API request."""
# Basic parameters needed for most exchanges
params = {
"symbol": self.get_trading_pair_format(trading_pair),
"interval": self._convert_interval_to_exchange_format(interval),
"limit": limit or MAX_RESULTS_PER_CANDLESTICK_REST_REQUEST
}
# Handle timestamps according to exchange requirements
if start_time is not None:
# Some exchanges require milliseconds
params["startTime"] = start_time * 1000 # Convert to milliseconds
# Or some might use a different parameter name
# params["from"] = start_time
if end_time is not None:
# Some exchanges require milliseconds
params["endTime"] = end_time * 1000 # Convert to milliseconds
# Or some might use a different parameter name
# params["to"] = end_time
return params
3. Implementing parse_rest_response()
¶
This method parses the exchange's response into standardized CandleData
objects:
def parse_rest_response(self, data: dict) -> List[CandleData]:
"""Parse REST API response into CandleData objects."""
candles = []
# Extract data based on exchange's response format
# This will vary significantly between exchanges
response_data = data.get("candles", []) # Adjust key based on exchange's format
for item in response_data:
# For array-based responses
if isinstance(item, list):
# Adjust indices based on exchange's format
candles.append(CandleData(
timestamp_raw=item[0], # Timestamp
open=float(item[1]), # Open price
high=float(item[2]), # High price
low=float(item[3]), # Low price
close=float(item[4]), # Close price
volume=float(item[5]), # Volume
quote_asset_volume=float(item[6]) if len(item) > 6 else 0.0,
n_trades=int(item[7]) if len(item) > 7 else 0,
taker_buy_base_volume=float(item[8]) if len(item) > 8 else 0.0,
taker_buy_quote_volume=float(item[9]) if len(item) > 9 else 0.0
))
# For object-based responses
elif isinstance(item, dict):
# Adjust keys based on exchange's format
candles.append(CandleData(
timestamp_raw=item.get("time", 0), # Timestamp
open=float(item.get("open", 0)), # Open price
high=float(item.get("high", 0)), # High price
low=float(item.get("low", 0)), # Low price
close=float(item.get("close", 0)), # Close price
volume=float(item.get("volume", 0)), # Volume
quote_asset_volume=float(item.get("quoteVolume", 0)),
n_trades=int(item.get("trades", 0)),
taker_buy_base_volume=float(item.get("takerBuyVolume", 0)),
taker_buy_quote_volume=float(item.get("takerBuyQuoteVolume", 0))
))
return candles
Common REST API Patterns¶
Different exchanges structure their REST APIs differently. Here are common patterns:
Timestamp Formats¶
Exchanges use various timestamp formats:
graph TD
A[Timestamp Formats] --> B[Seconds]
A --> C[Milliseconds]
A --> D[ISO 8601]
B --> B1["start: 1625097600"]
C --> C1["startTime: 1625097600000"]
D --> D1["start_time: '2021-07-01T00:00:00Z'"]
Always validate the timestamps and convert them to the framework's standard format (seconds).
Interval Formats¶
Exchanges use different formats for intervals:
Exchange | 1 minute | 1 hour | 1 day |
---|---|---|---|
Binance | "1m" | "1h" | "1d" |
Bybit | "1" | "60" | "D" |
KuCoin | "1min" | "1hour" | "1day" |
OKX | "1m" | "1H" | "1D" |
Create a mapping in your constants file:
INTERVAL_TO_EXCHANGE_FORMAT = {
"1m": "1min", # Standard -> Exchange format
"5m": "5min",
"1h": "60min",
"1d": "1day"
}
Then use it in your get_rest_params()
method:
params["interval"] = INTERVAL_TO_EXCHANGE_FORMAT.get(interval, interval)
Response Formats¶
Exchanges structure their responses differently:
Array-Based Response¶
{
"data": [
[1625097600, "34500.5", "34650.0", "34400.2", "34600.1", "120.5"],
[1625097660, "34600.1", "34700.0", "34550.0", "34650.8", "95.2"]
]
}
Object-Based Response¶
{
"candles": [
{
"time": 1625097600,
"open": "34500.5",
"high": "34650.0",
"low": "34400.2",
"close": "34600.1",
"volume": "120.5"
},
{
"time": 1625097660,
"open": "34600.1",
"high": "34700.0",
"low": "34550.0",
"close": "34650.8",
"volume": "95.2"
}
]
}
Error Handling¶
Implement robust error handling in your REST API methods:
def parse_rest_response(self, data: dict) -> List[CandleData]:
"""Parse REST API response into CandleData objects."""
candles = []
# Check for error response
if "error" in data and data["error"]:
error_msg = data.get("message", "Unknown error")
raise ValueError(f"Exchange error: {error_msg}")
# Check if data is in expected format
if "candles" not in data:
raise ValueError(f"Unexpected response format: {data}")
# Parse candles with validation
for item in data["candles"]:
try:
candles.append(CandleData(
timestamp_raw=item.get("time", 0),
open=float(item.get("open", 0)),
high=float(item.get("high", 0)),
low=float(item.get("low", 0)),
close=float(item.get("close", 0)),
volume=float(item.get("volume", 0))
))
except (ValueError, TypeError) as e:
# Log error but continue processing
logger.warning(f"Error parsing candle data: {e}, data: {item}")
return candles
Rate Limiting¶
Respect the exchange's rate limits to avoid being banned:
# In your constants.py file
RATE_LIMITS = [
RateLimit(limit_id="CANDLES", limit=60, time_interval=60) # 60 requests per minute
]
# In your adapter implementation
def get_rate_limits(self) -> List[RateLimit]:
"""Get rate limits for the exchange."""
return RATE_LIMITS
Testing REST API Integration¶
Test your REST implementation with various scenarios:
- Fetching recent candles
- Fetching historical candles
- Handling pagination
- Error conditions
- Rate limit behavior
Example test:
def test_parse_rest_response():
"""Test parsing REST response."""
adapter = YourExchangeAdapter()
# Sample response data
response = {
"candles": [
{
"time": 1625097600,
"open": "34500.5",
"high": "34650.0",
"low": "34400.2",
"close": "34600.1",
"volume": "120.5"
}
]
}
candles = adapter.parse_rest_response(response)
assert len(candles) == 1
assert candles[0].timestamp == 1625097600
assert candles[0].open == 34500.5
assert candles[0].high == 34650.0
assert candles[0].low == 34400.2
assert candles[0].close == 34600.1
assert candles[0].volume == 120.5
Next Steps¶
After implementing REST API functionality, proceed to:
- WebSocket Integration: Implementing real-time data collection
- Testing Your Adapter: Comprehensive testing strategy