Skip to content

Core Components API Reference

This page documents the core classes and functions of the Candles Feed framework.

CandlesFeed

Main class that coordinates data collection from exchanges.

This class is responsible for creating and managing the components needed to fetch and process candle data from exchanges.

Source code in candles_feed/core/candles_feed.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
class CandlesFeed:
    """Main class that coordinates data collection from exchanges.

    This class is responsible for creating and managing the components needed
    to fetch and process candle data from exchanges.
    """

    def __init__(
        self,
        exchange: str,
        trading_pair: str,
        interval: str = "1m",
        max_records: int = 150,
        logger: Logger | None = None,
    ):
        """Initialize the candles feed.

        Args:
            exchange: Name of the exchange
            trading_pair: Trading pair
            interval: Candle interval
            max_records: Maximum number of candles to store
            logger: Logger instance
        """
        self.exchange = exchange
        self.trading_pair = trading_pair
        self.interval = interval
        self.max_records = max_records
        self.logger = logger or logging.getLogger(__name__)

        # Get adapter from registry
        self._adapter = ExchangeRegistry.get_adapter_instance(exchange)
        self.ex_trading_pair = self._adapter.get_trading_pair_format(trading_pair)

        # Initialize components
        self._candles: Deque[CandleData] = deque(maxlen=max_records)
        self._network_client = NetworkClient()
        self._data_processor = DataProcessor()

        # Strategy attributes
        self._rest_strategy = None
        self._ws_strategy = None
        self._active = False
        self._using_ws = False

    def _create_ws_strategy(self):
        """Create a WebSocket strategy instance.

        This is a helper method that can be mocked in tests.
        """
        return WebSocketStrategy(
            network_client=self._network_client,
            adapter=self._adapter,
            trading_pair=self.trading_pair,
            interval=self.interval,
            data_processor=self._data_processor,
            candles_store=self._candles,
        )

    def _create_rest_strategy(self):
        """Create a REST polling strategy instance.

        This is a helper method that can be mocked in tests.
        """
        return RESTPollingStrategy(
            network_client=self._network_client,
            adapter=self._adapter,
            trading_pair=self.trading_pair,
            interval=self.interval,
            data_processor=self._data_processor,
            candles_store=self._candles,
        )

    async def start(self, strategy: str = "auto") -> None:
        """Start the feed.

        Args:
            strategy: Strategy to use ("auto", "websocket", or "polling")
        """
        if self._active:
            return

        self.logger.info(f"Starting candles feed for {self.trading_pair} on {self.exchange}")

        # Determine which strategy to use
        use_websocket = False

        if strategy == "auto":
            # Check if the interval is supported by websocket
            ws_intervals = self._adapter.get_ws_supported_intervals()
            use_websocket = self.interval in ws_intervals
        elif strategy == "websocket":
            use_websocket = True

        # Create and start appropriate strategy
        if use_websocket:
            # Only create a new strategy if one doesn't exist already (for testing)
            if not self._ws_strategy:
                self._ws_strategy = self._create_ws_strategy()

            if not self._ws_strategy:
                raise ValueError("WebSocket strategy not supported by this adapter")

            await self._ws_strategy.start()
            self._using_ws = True
        else:
            # Only create a new strategy if one doesn't exist already (for testing)
            if not self._rest_strategy:
                self._rest_strategy = self._create_rest_strategy()

            if not self._rest_strategy:
                raise ValueError("WebSocket strategy not supported by this adapter")

            await self._rest_strategy.start()
            self._using_ws = False

        self._active = True

    async def stop(self) -> None:
        """Stop the feed."""
        if not self._active:
            return

        self.logger.info(f"Stopping candles feed for {self.trading_pair}")

        if self._using_ws and self._ws_strategy:
            await self._ws_strategy.stop()
        elif self._rest_strategy:
            await self._rest_strategy.stop()

        # Clean up network client resources
        await self._network_client.close()

        self._active = False

    def get_candles_df(self) -> pd.DataFrame:
        """Get candles as a pandas DataFrame.

        Returns:
            DataFrame with candle data
        """
        return pd.DataFrame(
            [
                {
                    "timestamp": c.timestamp,
                    "open": c.open,
                    "high": c.high,
                    "low": c.low,
                    "close": c.close,
                    "volume": c.volume,
                    "quote_asset_volume": c.quote_asset_volume,
                    "n_trades": c.n_trades,
                    "taker_buy_base_volume": c.taker_buy_base_volume,
                    "taker_buy_quote_volume": c.taker_buy_quote_volume,
                }
                for c in self._candles
            ]
        )

    async def fetch_candles(
        self, start_time: int | None = None, end_time: int | None = None
    ) -> list[CandleData]:
        """Fetch historical candles.

        Args:
            start_time: Start time in seconds (optional)
            end_time: End time in seconds (optional)

        Returns:
            List of candle data objects
        """
        self.logger.info(f"Fetching historical candles for {self.trading_pair} on {self.exchange}")

        # Create REST strategy if it doesn't exist
        if not self._rest_strategy:
            self._rest_strategy = self._create_rest_strategy()

        if not self._rest_strategy:
            raise ValueError("REST polling strategy not supported by this adapter")

        # Fetch candles
        candles = await self._rest_strategy.poll_once(start_time, end_time)

        # Add candles to the store
        if candles:
            # Clear existing candles if fetching from the beginning
            if start_time is None or (
                len(self._candles) > 0 and start_time < self._candles[0].timestamp
            ):
                self._candles.clear()

            # Add each candle to the store
            for candle in candles:
                self._data_processor.process_candle(candle, self._candles)

        return candles

    def get_candles(self) -> list[CandleData]:
        """Get raw candle data.

        Returns:
            List of CandleData objects
        """
        return list(self._candles)

    def add_candle(self, candle: CandleData) -> None:
        """Add a candle to the store.

        Args:
            candle: Candle data to add
        """
        self._data_processor.process_candle(candle, self._candles)

    @property
    def ready(self) -> bool:
        """Check if the feed is ready.

        Returns:
            True if the feed is ready, False otherwise
        """
        return len(self._candles) >= self.max_records * 0.9  # At least 90% filled

    @property
    def last_timestamp(self) -> int | None:
        """Get the timestamp of the most recent candle.

        Returns:
            Timestamp in seconds, or None if no candles available
        """
        return self._candles[-1].timestamp if self._candles else None

    @property
    def first_timestamp(self) -> int | None:
        """Get the timestamp of the oldest candle.

        Returns:
            Timestamp in seconds, or None if no candles available
        """
        return self._candles[0].timestamp if self._candles else None

first_timestamp property

Get the timestamp of the oldest candle.

Returns:

Type Description
int | None

Timestamp in seconds, or None if no candles available

last_timestamp property

Get the timestamp of the most recent candle.

Returns:

Type Description
int | None

Timestamp in seconds, or None if no candles available

ready property

Check if the feed is ready.

Returns:

Type Description
bool

True if the feed is ready, False otherwise

__init__(exchange, trading_pair, interval='1m', max_records=150, logger=None)

Initialize the candles feed.

Parameters:

Name Type Description Default
exchange str

Name of the exchange

required
trading_pair str

Trading pair

required
interval str

Candle interval

'1m'
max_records int

Maximum number of candles to store

150
logger Logger | None

Logger instance

None
Source code in candles_feed/core/candles_feed.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def __init__(
    self,
    exchange: str,
    trading_pair: str,
    interval: str = "1m",
    max_records: int = 150,
    logger: Logger | None = None,
):
    """Initialize the candles feed.

    Args:
        exchange: Name of the exchange
        trading_pair: Trading pair
        interval: Candle interval
        max_records: Maximum number of candles to store
        logger: Logger instance
    """
    self.exchange = exchange
    self.trading_pair = trading_pair
    self.interval = interval
    self.max_records = max_records
    self.logger = logger or logging.getLogger(__name__)

    # Get adapter from registry
    self._adapter = ExchangeRegistry.get_adapter_instance(exchange)
    self.ex_trading_pair = self._adapter.get_trading_pair_format(trading_pair)

    # Initialize components
    self._candles: Deque[CandleData] = deque(maxlen=max_records)
    self._network_client = NetworkClient()
    self._data_processor = DataProcessor()

    # Strategy attributes
    self._rest_strategy = None
    self._ws_strategy = None
    self._active = False
    self._using_ws = False

_create_rest_strategy()

Create a REST polling strategy instance.

This is a helper method that can be mocked in tests.

Source code in candles_feed/core/candles_feed.py
85
86
87
88
89
90
91
92
93
94
95
96
97
def _create_rest_strategy(self):
    """Create a REST polling strategy instance.

    This is a helper method that can be mocked in tests.
    """
    return RESTPollingStrategy(
        network_client=self._network_client,
        adapter=self._adapter,
        trading_pair=self.trading_pair,
        interval=self.interval,
        data_processor=self._data_processor,
        candles_store=self._candles,
    )

_create_ws_strategy()

Create a WebSocket strategy instance.

This is a helper method that can be mocked in tests.

Source code in candles_feed/core/candles_feed.py
71
72
73
74
75
76
77
78
79
80
81
82
83
def _create_ws_strategy(self):
    """Create a WebSocket strategy instance.

    This is a helper method that can be mocked in tests.
    """
    return WebSocketStrategy(
        network_client=self._network_client,
        adapter=self._adapter,
        trading_pair=self.trading_pair,
        interval=self.interval,
        data_processor=self._data_processor,
        candles_store=self._candles,
    )

add_candle(candle)

Add a candle to the store.

Parameters:

Name Type Description Default
candle CandleData

Candle data to add

required
Source code in candles_feed/core/candles_feed.py
231
232
233
234
235
236
237
def add_candle(self, candle: CandleData) -> None:
    """Add a candle to the store.

    Args:
        candle: Candle data to add
    """
    self._data_processor.process_candle(candle, self._candles)

fetch_candles(start_time=None, end_time=None) async

Fetch historical candles.

Parameters:

Name Type Description Default
start_time int | None

Start time in seconds (optional)

None
end_time int | None

End time in seconds (optional)

None

Returns:

Type Description
list[CandleData]

List of candle data objects

Source code in candles_feed/core/candles_feed.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
async def fetch_candles(
    self, start_time: int | None = None, end_time: int | None = None
) -> list[CandleData]:
    """Fetch historical candles.

    Args:
        start_time: Start time in seconds (optional)
        end_time: End time in seconds (optional)

    Returns:
        List of candle data objects
    """
    self.logger.info(f"Fetching historical candles for {self.trading_pair} on {self.exchange}")

    # Create REST strategy if it doesn't exist
    if not self._rest_strategy:
        self._rest_strategy = self._create_rest_strategy()

    if not self._rest_strategy:
        raise ValueError("REST polling strategy not supported by this adapter")

    # Fetch candles
    candles = await self._rest_strategy.poll_once(start_time, end_time)

    # Add candles to the store
    if candles:
        # Clear existing candles if fetching from the beginning
        if start_time is None or (
            len(self._candles) > 0 and start_time < self._candles[0].timestamp
        ):
            self._candles.clear()

        # Add each candle to the store
        for candle in candles:
            self._data_processor.process_candle(candle, self._candles)

    return candles

get_candles()

Get raw candle data.

Returns:

Type Description
list[CandleData]

List of CandleData objects

Source code in candles_feed/core/candles_feed.py
223
224
225
226
227
228
229
def get_candles(self) -> list[CandleData]:
    """Get raw candle data.

    Returns:
        List of CandleData objects
    """
    return list(self._candles)

get_candles_df()

Get candles as a pandas DataFrame.

Returns:

Type Description
DataFrame

DataFrame with candle data

Source code in candles_feed/core/candles_feed.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def get_candles_df(self) -> pd.DataFrame:
    """Get candles as a pandas DataFrame.

    Returns:
        DataFrame with candle data
    """
    return pd.DataFrame(
        [
            {
                "timestamp": c.timestamp,
                "open": c.open,
                "high": c.high,
                "low": c.low,
                "close": c.close,
                "volume": c.volume,
                "quote_asset_volume": c.quote_asset_volume,
                "n_trades": c.n_trades,
                "taker_buy_base_volume": c.taker_buy_base_volume,
                "taker_buy_quote_volume": c.taker_buy_quote_volume,
            }
            for c in self._candles
        ]
    )

start(strategy='auto') async

Start the feed.

Parameters:

Name Type Description Default
strategy str

Strategy to use ("auto", "websocket", or "polling")

'auto'
Source code in candles_feed/core/candles_feed.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
async def start(self, strategy: str = "auto") -> None:
    """Start the feed.

    Args:
        strategy: Strategy to use ("auto", "websocket", or "polling")
    """
    if self._active:
        return

    self.logger.info(f"Starting candles feed for {self.trading_pair} on {self.exchange}")

    # Determine which strategy to use
    use_websocket = False

    if strategy == "auto":
        # Check if the interval is supported by websocket
        ws_intervals = self._adapter.get_ws_supported_intervals()
        use_websocket = self.interval in ws_intervals
    elif strategy == "websocket":
        use_websocket = True

    # Create and start appropriate strategy
    if use_websocket:
        # Only create a new strategy if one doesn't exist already (for testing)
        if not self._ws_strategy:
            self._ws_strategy = self._create_ws_strategy()

        if not self._ws_strategy:
            raise ValueError("WebSocket strategy not supported by this adapter")

        await self._ws_strategy.start()
        self._using_ws = True
    else:
        # Only create a new strategy if one doesn't exist already (for testing)
        if not self._rest_strategy:
            self._rest_strategy = self._create_rest_strategy()

        if not self._rest_strategy:
            raise ValueError("WebSocket strategy not supported by this adapter")

        await self._rest_strategy.start()
        self._using_ws = False

    self._active = True

stop() async

Stop the feed.

Source code in candles_feed/core/candles_feed.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
async def stop(self) -> None:
    """Stop the feed."""
    if not self._active:
        return

    self.logger.info(f"Stopping candles feed for {self.trading_pair}")

    if self._using_ws and self._ws_strategy:
        await self._ws_strategy.stop()
    elif self._rest_strategy:
        await self._rest_strategy.stop()

    # Clean up network client resources
    await self._network_client.close()

    self._active = False

CandleData

Standardized candle data representation.

This class provides a structured and type-safe representation of candle data with automatic timestamp normalization.

Attributes:

Name Type Description
timestamp int

The candle timestamp in seconds

open float

Opening price

high float

Highest price during period

low float

Lowest price during period

close float

Closing price

volume float

Trading volume

quote_asset_volume float

Quote asset volume (optional)

n_trades int

Number of trades (optional)

taker_buy_base_volume float

Base asset volume from taker buys (optional)

taker_buy_quote_volume float

Quote asset volume from taker buys (optional)

Source code in candles_feed/core/candle_data.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
@dataclass
class CandleData:
    """Standardized candle data representation.

    This class provides a structured and type-safe representation of candle data
    with automatic timestamp normalization.

    Attributes:
        timestamp: The candle timestamp in seconds
        open: Opening price
        high: Highest price during period
        low: Lowest price during period
        close: Closing price
        volume: Trading volume
        quote_asset_volume: Quote asset volume (optional)
        n_trades: Number of trades (optional)
        taker_buy_base_volume: Base asset volume from taker buys (optional)
        taker_buy_quote_volume: Quote asset volume from taker buys (optional)
    """

    timestamp_raw: InitVar[int | float | str | datetime]

    timestamp: int = field(init=False)
    open: float
    high: float
    low: float
    close: float
    volume: float
    quote_asset_volume: float = 0.0
    n_trades: int = 0
    taker_buy_base_volume: float = 0.0
    taker_buy_quote_volume: float = 0.0

    @property
    def timestamp_ms(self) -> int:
        """Convert timestamp to milliseconds."""
        return self.timestamp * 1000

    _timestamp_keys: ClassVar[tuple[str, ...]] = ("timestamp", "time", "t")
    _price_keys: ClassVar[dict[str, tuple[str, ...]]] = {
        "open": ("open", "o"),
        "high": ("high", "h"),
        "low": ("low", "l"),
        "close": ("close", "c"),
        "volume": ("volume", "v"),
    }

    def __post_init__(self, timestamp_raw: int | float | str | datetime) -> None:
        """Convert timestamp to integer seconds after initialization.

        Args:
            timestamp_raw: Raw timestamp input in various formats
        """
        self.timestamp = self._normalize_timestamp(timestamp_raw)

    @staticmethod
    def _normalize_timestamp(ts: int | float | str | datetime) -> int:
        """Convert various timestamp formats to integer seconds.

        Args:
            ts: Timestamp in various formats

        Returns:
            Timestamp as integer seconds

        Raises:
            ValueError: If timestamp cannot be converted
        """
        if isinstance(ts, int):
            # Handle milliseconds/microseconds timestamps
            if ts > 10000000000:  # Likely milliseconds or microseconds
                return ts // 1000000 if ts > 10000000000000 else ts // 1000
            return ts
        elif isinstance(ts, float):
            # Handle floating point timestamps (potentially with fractional seconds)
            if ts > 10000000000:  # Likely milliseconds or microseconds
                return int(ts) // 1000000 if ts > 10000000000000 else int(ts) // 1000
            return int(ts)
        elif isinstance(ts, str):
            try:
                # Try parsing as Unix timestamp first
                return CandleData._normalize_timestamp(float(ts))
            except ValueError:
                try:
                    # Try parsing as ISO format
                    dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
                    return CandleData.to_utc_seconds(dt)
                except ValueError as e:
                    raise ValueError(f"Could not parse timestamp string: {ts}") from e
        elif isinstance(ts, datetime):
            return CandleData.to_utc_seconds(ts)
        else:
            raise ValueError(f"Unsupported timestamp type: {type(ts)}")

    @staticmethod
    def to_utc_seconds(dt: datetime) -> int:
        """Convert datetime to UTC timestamp in seconds.

        Args:
            dt: Datetime to convert

        Returns:
            UTC timestamp in seconds
        """
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        return int(dt.astimezone(timezone.utc).timestamp())

    def to_array(self) -> list[float]:
        """Convert to array format for backward compatibility.

        Returns:
            List of candle values
        """
        return [
            float(self.timestamp),
            self.open,
            self.high,
            self.low,
            self.close,
            self.volume,
            self.quote_asset_volume,
            self.n_trades,
            self.taker_buy_base_volume,
            self.taker_buy_quote_volume,
        ]

    @classmethod
    def from_array(cls, data: list[float]) -> "CandleData":
        """Create from array format for backward compatibility.

        Args:
            data: Array of candle values

        Returns:
            CandleData instance
        """
        return cls(
            timestamp_raw=data[0],
            open=data[1],
            high=data[2],
            low=data[3],
            close=data[4],
            volume=data[5],
            quote_asset_volume=data[6],
            n_trades=int(data[7]),
            taker_buy_base_volume=data[8],
            taker_buy_quote_volume=data[9],
        )

    @classmethod
    def from_dict(cls, data: dict) -> "CandleData":
        """Create CandleData from a dictionary.

        Args:
            data: Dictionary containing candle data

        Returns:
            CandleData instance

        Raises:
            ValueError: If required fields are missing or invalid
        """
        timestamp_raw = next((data[key] for key in cls._timestamp_keys if key in data), None)
        if timestamp_raw is None:
            raise ValueError(f"No timestamp found in keys: {cls._timestamp_keys}")

        # Find price values
        values: dict[str, float | int] = {}
        for f, keys in cls._price_keys.items():
            value = next((float(data[key]) for key in keys if key in data), None)
            if value is None:
                raise ValueError(f"No {f} value found in keys: {keys}")
            values[f] = value

        # Return with explicit parameters instead of unpacking dictionaries
        return cls(
            timestamp_raw=timestamp_raw,
            open=values["open"],
            high=values["high"],
            low=values["low"],
            close=values["close"],
            volume=values["volume"],
            quote_asset_volume=float(data.get("quote_asset_volume", 0)),
            n_trades=int(data.get("n_trades", 0)),
            taker_buy_base_volume=float(data.get("taker_buy_base_volume", 0)),
            taker_buy_quote_volume=float(data.get("taker_buy_quote_volume", 0)),
        )

timestamp_ms property

Convert timestamp to milliseconds.

__post_init__(timestamp_raw)

Convert timestamp to integer seconds after initialization.

Parameters:

Name Type Description Default
timestamp_raw int | float | str | datetime

Raw timestamp input in various formats

required
Source code in candles_feed/core/candle_data.py
57
58
59
60
61
62
63
def __post_init__(self, timestamp_raw: int | float | str | datetime) -> None:
    """Convert timestamp to integer seconds after initialization.

    Args:
        timestamp_raw: Raw timestamp input in various formats
    """
    self.timestamp = self._normalize_timestamp(timestamp_raw)

_normalize_timestamp(ts) staticmethod

Convert various timestamp formats to integer seconds.

Parameters:

Name Type Description Default
ts int | float | str | datetime

Timestamp in various formats

required

Returns:

Type Description
int

Timestamp as integer seconds

Raises:

Type Description
ValueError

If timestamp cannot be converted

Source code in candles_feed/core/candle_data.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@staticmethod
def _normalize_timestamp(ts: int | float | str | datetime) -> int:
    """Convert various timestamp formats to integer seconds.

    Args:
        ts: Timestamp in various formats

    Returns:
        Timestamp as integer seconds

    Raises:
        ValueError: If timestamp cannot be converted
    """
    if isinstance(ts, int):
        # Handle milliseconds/microseconds timestamps
        if ts > 10000000000:  # Likely milliseconds or microseconds
            return ts // 1000000 if ts > 10000000000000 else ts // 1000
        return ts
    elif isinstance(ts, float):
        # Handle floating point timestamps (potentially with fractional seconds)
        if ts > 10000000000:  # Likely milliseconds or microseconds
            return int(ts) // 1000000 if ts > 10000000000000 else int(ts) // 1000
        return int(ts)
    elif isinstance(ts, str):
        try:
            # Try parsing as Unix timestamp first
            return CandleData._normalize_timestamp(float(ts))
        except ValueError:
            try:
                # Try parsing as ISO format
                dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
                return CandleData.to_utc_seconds(dt)
            except ValueError as e:
                raise ValueError(f"Could not parse timestamp string: {ts}") from e
    elif isinstance(ts, datetime):
        return CandleData.to_utc_seconds(ts)
    else:
        raise ValueError(f"Unsupported timestamp type: {type(ts)}")

from_array(data) classmethod

Create from array format for backward compatibility.

Parameters:

Name Type Description Default
data list[float]

Array of candle values

required

Returns:

Type Description
CandleData

CandleData instance

Source code in candles_feed/core/candle_data.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
@classmethod
def from_array(cls, data: list[float]) -> "CandleData":
    """Create from array format for backward compatibility.

    Args:
        data: Array of candle values

    Returns:
        CandleData instance
    """
    return cls(
        timestamp_raw=data[0],
        open=data[1],
        high=data[2],
        low=data[3],
        close=data[4],
        volume=data[5],
        quote_asset_volume=data[6],
        n_trades=int(data[7]),
        taker_buy_base_volume=data[8],
        taker_buy_quote_volume=data[9],
    )

from_dict(data) classmethod

Create CandleData from a dictionary.

Parameters:

Name Type Description Default
data dict

Dictionary containing candle data

required

Returns:

Type Description
CandleData

CandleData instance

Raises:

Type Description
ValueError

If required fields are missing or invalid

Source code in candles_feed/core/candle_data.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
@classmethod
def from_dict(cls, data: dict) -> "CandleData":
    """Create CandleData from a dictionary.

    Args:
        data: Dictionary containing candle data

    Returns:
        CandleData instance

    Raises:
        ValueError: If required fields are missing or invalid
    """
    timestamp_raw = next((data[key] for key in cls._timestamp_keys if key in data), None)
    if timestamp_raw is None:
        raise ValueError(f"No timestamp found in keys: {cls._timestamp_keys}")

    # Find price values
    values: dict[str, float | int] = {}
    for f, keys in cls._price_keys.items():
        value = next((float(data[key]) for key in keys if key in data), None)
        if value is None:
            raise ValueError(f"No {f} value found in keys: {keys}")
        values[f] = value

    # Return with explicit parameters instead of unpacking dictionaries
    return cls(
        timestamp_raw=timestamp_raw,
        open=values["open"],
        high=values["high"],
        low=values["low"],
        close=values["close"],
        volume=values["volume"],
        quote_asset_volume=float(data.get("quote_asset_volume", 0)),
        n_trades=int(data.get("n_trades", 0)),
        taker_buy_base_volume=float(data.get("taker_buy_base_volume", 0)),
        taker_buy_quote_volume=float(data.get("taker_buy_quote_volume", 0)),
    )

to_array()

Convert to array format for backward compatibility.

Returns:

Type Description
list[float]

List of candle values

Source code in candles_feed/core/candle_data.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def to_array(self) -> list[float]:
    """Convert to array format for backward compatibility.

    Returns:
        List of candle values
    """
    return [
        float(self.timestamp),
        self.open,
        self.high,
        self.low,
        self.close,
        self.volume,
        self.quote_asset_volume,
        self.n_trades,
        self.taker_buy_base_volume,
        self.taker_buy_quote_volume,
    ]

to_utc_seconds(dt) staticmethod

Convert datetime to UTC timestamp in seconds.

Parameters:

Name Type Description Default
dt datetime

Datetime to convert

required

Returns:

Type Description
int

UTC timestamp in seconds

Source code in candles_feed/core/candle_data.py
104
105
106
107
108
109
110
111
112
113
114
115
116
@staticmethod
def to_utc_seconds(dt: datetime) -> int:
    """Convert datetime to UTC timestamp in seconds.

    Args:
        dt: Datetime to convert

    Returns:
        UTC timestamp in seconds
    """
    if dt.tzinfo is None:
        dt = dt.replace(tzinfo=timezone.utc)
    return int(dt.astimezone(timezone.utc).timestamp())

ExchangeRegistry

Registry for exchange adapters with auto-discovery.

Source code in candles_feed/core/exchange_registry.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
class ExchangeRegistry:
    """Registry for exchange adapters with auto-discovery."""

    # Use both _adapters (new API) and _registry (old API) for compatibility
    _adapters: dict[str, type[CandleDataAdapter]] = {}
    _registry: dict[str, type[CandleDataAdapter]] = {}
    _logger: Logger | None = None

    @classmethod
    def logger(cls) -> Logger:
        """Get the logger.

        Returns:
            Logger instance
        """
        if cls._logger is None:
            cls._logger = logging.getLogger(__name__)
        return cls._logger

    @classmethod
    def register(cls, name: str):
        """Decorator for registering exchange adapters.

        Args:
            name: Adapter name

        Returns:
            Decorator function
        """

        def decorator(adapter_class: type[CandleDataAdapter]):
            cls.logger().info(f"Registering adapter: {name}")
            # Register in both collections for compatibility
            cls._adapters[name] = adapter_class
            cls._registry[name] = adapter_class
            return adapter_class

        return decorator

    @classmethod
    def get_adapter_class(cls, name: str) -> type[CandleDataAdapter]:
        """Get adapter class by name.

        Args:
            name: Adapter name

        Returns:
            Adapter class

        Raises:
            ValueError: If no adapter is registered with the given name
        """
        adapter_class = cls._registry.get(name)
        if adapter_class is None:
            raise ValueError(f"Adapter not found for exchange: {name}")
        return adapter_class

    @classmethod
    def get_adapter(cls, name: str) -> CandleDataAdapter:
        """Get adapter instance by name.

        Args:
            name: Adapter name

        Returns:
            Adapter instance

        Raises:
            ValueError: If no adapter is registered with the given name
        """
        adapter_class = cls.get_adapter_class(name)
        cls.logger().debug(f"Creating adapter instance: {name}")
        return adapter_class()

    @classmethod
    def get_adapter_instance(cls, name: str, *args, **kwargs) -> CandleDataAdapter:
        """Get adapter instance by name with custom args.

        Args:
            name: Adapter name
            *args: Positional arguments to pass to the adapter constructor
            **kwargs: Keyword arguments to pass to the adapter constructor

        Returns:
            Adapter instance
        """
        adapter_class = cls.get_adapter_class(name)
        cls.logger().debug(f"Creating adapter instance with args: {name}")
        return adapter_class(*args, **kwargs)

    @classmethod
    def get_registered_exchanges(cls) -> list[str]:
        """Get list of registered exchange names.

        Returns:
            List of registered exchange names
        """
        return list(cls._registry.keys())

    @classmethod
    def discover_adapters(cls, package_path: str | None = None) -> None:
        """Discover and register adapters in the given package.

        Args:
            package_path: Path to package to search for adapters
        """
        if package_path is None:
            # Default to candles_feed adapters directory
            package_path = os.path.abspath(
                os.path.join(os.path.dirname(__file__), "..", "adapters")
            )

        cls.logger().info(f"Discovering adapters in: {package_path}")

        # Import all modules in the package to trigger decorator registration
        import_path = os.path.basename(os.path.dirname(package_path))
        if import_path:
            import_path = f"{import_path}."
        import_path += os.path.basename(package_path)

        for _, name, is_pkg in pkgutil.iter_modules([package_path]):
            if is_pkg:
                # This is a potential exchange adapter package
                try:
                    cls.logger().debug(f"Importing potential adapter: {import_path}.{name}")
                    importlib.import_module(f"{import_path}.{name}")
                except ImportError as e:
                    cls.logger().error(f"Error importing {name}: {e}")

    @classmethod
    def list_available_adapters(cls) -> list[str]:
        """List all registered adapter names.

        Returns:
            List of adapter names
        """
        return list(cls._adapters.keys())

    @classmethod
    def list_available_markets(cls) -> dict[str, list[str]]:
        """List all available markets by adapter.

        Returns:
            Dictionary mapping adapter names to list of supported trading pairs
        """
        result = {}
        for name, adapter_class in cls._adapters.items():
            try:
                adapter = adapter_class()
                # Some adapters may implement a get_available_markets method
                if hasattr(adapter, "get_available_markets"):
                    result[name] = adapter.get_available_markets()
                else:
                    result[name] = []
            except Exception as e:
                cls.logger().error(f"Error getting markets for {name}: {e}")
                result[name] = []

        return result

discover_adapters(package_path=None) classmethod

Discover and register adapters in the given package.

Parameters:

Name Type Description Default
package_path str | None

Path to package to search for adapters

None
Source code in candles_feed/core/exchange_registry.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@classmethod
def discover_adapters(cls, package_path: str | None = None) -> None:
    """Discover and register adapters in the given package.

    Args:
        package_path: Path to package to search for adapters
    """
    if package_path is None:
        # Default to candles_feed adapters directory
        package_path = os.path.abspath(
            os.path.join(os.path.dirname(__file__), "..", "adapters")
        )

    cls.logger().info(f"Discovering adapters in: {package_path}")

    # Import all modules in the package to trigger decorator registration
    import_path = os.path.basename(os.path.dirname(package_path))
    if import_path:
        import_path = f"{import_path}."
    import_path += os.path.basename(package_path)

    for _, name, is_pkg in pkgutil.iter_modules([package_path]):
        if is_pkg:
            # This is a potential exchange adapter package
            try:
                cls.logger().debug(f"Importing potential adapter: {import_path}.{name}")
                importlib.import_module(f"{import_path}.{name}")
            except ImportError as e:
                cls.logger().error(f"Error importing {name}: {e}")

get_adapter(name) classmethod

Get adapter instance by name.

Parameters:

Name Type Description Default
name str

Adapter name

required

Returns:

Type Description
CandleDataAdapter

Adapter instance

Raises:

Type Description
ValueError

If no adapter is registered with the given name

Source code in candles_feed/core/exchange_registry.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@classmethod
def get_adapter(cls, name: str) -> CandleDataAdapter:
    """Get adapter instance by name.

    Args:
        name: Adapter name

    Returns:
        Adapter instance

    Raises:
        ValueError: If no adapter is registered with the given name
    """
    adapter_class = cls.get_adapter_class(name)
    cls.logger().debug(f"Creating adapter instance: {name}")
    return adapter_class()

get_adapter_class(name) classmethod

Get adapter class by name.

Parameters:

Name Type Description Default
name str

Adapter name

required

Returns:

Type Description
type[CandleDataAdapter]

Adapter class

Raises:

Type Description
ValueError

If no adapter is registered with the given name

Source code in candles_feed/core/exchange_registry.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@classmethod
def get_adapter_class(cls, name: str) -> type[CandleDataAdapter]:
    """Get adapter class by name.

    Args:
        name: Adapter name

    Returns:
        Adapter class

    Raises:
        ValueError: If no adapter is registered with the given name
    """
    adapter_class = cls._registry.get(name)
    if adapter_class is None:
        raise ValueError(f"Adapter not found for exchange: {name}")
    return adapter_class

get_adapter_instance(name, *args, **kwargs) classmethod

Get adapter instance by name with custom args.

Parameters:

Name Type Description Default
name str

Adapter name

required
*args

Positional arguments to pass to the adapter constructor

()
**kwargs

Keyword arguments to pass to the adapter constructor

{}

Returns:

Type Description
CandleDataAdapter

Adapter instance

Source code in candles_feed/core/exchange_registry.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@classmethod
def get_adapter_instance(cls, name: str, *args, **kwargs) -> CandleDataAdapter:
    """Get adapter instance by name with custom args.

    Args:
        name: Adapter name
        *args: Positional arguments to pass to the adapter constructor
        **kwargs: Keyword arguments to pass to the adapter constructor

    Returns:
        Adapter instance
    """
    adapter_class = cls.get_adapter_class(name)
    cls.logger().debug(f"Creating adapter instance with args: {name}")
    return adapter_class(*args, **kwargs)

get_registered_exchanges() classmethod

Get list of registered exchange names.

Returns:

Type Description
list[str]

List of registered exchange names

Source code in candles_feed/core/exchange_registry.py
107
108
109
110
111
112
113
114
@classmethod
def get_registered_exchanges(cls) -> list[str]:
    """Get list of registered exchange names.

    Returns:
        List of registered exchange names
    """
    return list(cls._registry.keys())

list_available_adapters() classmethod

List all registered adapter names.

Returns:

Type Description
list[str]

List of adapter names

Source code in candles_feed/core/exchange_registry.py
146
147
148
149
150
151
152
153
@classmethod
def list_available_adapters(cls) -> list[str]:
    """List all registered adapter names.

    Returns:
        List of adapter names
    """
    return list(cls._adapters.keys())

list_available_markets() classmethod

List all available markets by adapter.

Returns:

Type Description
dict[str, list[str]]

Dictionary mapping adapter names to list of supported trading pairs

Source code in candles_feed/core/exchange_registry.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
@classmethod
def list_available_markets(cls) -> dict[str, list[str]]:
    """List all available markets by adapter.

    Returns:
        Dictionary mapping adapter names to list of supported trading pairs
    """
    result = {}
    for name, adapter_class in cls._adapters.items():
        try:
            adapter = adapter_class()
            # Some adapters may implement a get_available_markets method
            if hasattr(adapter, "get_available_markets"):
                result[name] = adapter.get_available_markets()
            else:
                result[name] = []
        except Exception as e:
            cls.logger().error(f"Error getting markets for {name}: {e}")
            result[name] = []

    return result

logger() classmethod

Get the logger.

Returns:

Type Description
Logger

Logger instance

Source code in candles_feed/core/exchange_registry.py
25
26
27
28
29
30
31
32
33
34
@classmethod
def logger(cls) -> Logger:
    """Get the logger.

    Returns:
        Logger instance
    """
    if cls._logger is None:
        cls._logger = logging.getLogger(__name__)
    return cls._logger

register(name) classmethod

Decorator for registering exchange adapters.

Parameters:

Name Type Description Default
name str

Adapter name

required

Returns:

Type Description

Decorator function

Source code in candles_feed/core/exchange_registry.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@classmethod
def register(cls, name: str):
    """Decorator for registering exchange adapters.

    Args:
        name: Adapter name

    Returns:
        Decorator function
    """

    def decorator(adapter_class: type[CandleDataAdapter]):
        cls.logger().info(f"Registering adapter: {name}")
        # Register in both collections for compatibility
        cls._adapters[name] = adapter_class
        cls._registry[name] = adapter_class
        return adapter_class

    return decorator

NetworkClient

Handles network communication with exchanges.

This class provides methods for communicating with exchange APIs, handling both REST and WebSocket connections.

Source code in candles_feed/core/network_client.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class NetworkClient:
    """Handles network communication with exchanges.

    This class provides methods for communicating with exchange APIs,
    handling both REST and WebSocket connections.
    """

    def __init__(self, logger: Logger | None = None):
        """Initialize the NetworkClient.

        Args:
            logger: Logger instance
        """
        self.logger: Logger = logger or logging.getLogger(__name__)
        self._session: aiohttp.ClientSession | None = None

    async def _ensure_session(self):
        """Ensure aiohttp session exists."""
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession()

    async def close(self):
        """Close the client session.

        Should be called when the network client is no longer needed
        to properly clean up resources.
        """
        if self._session and not self._session.closed:
            await self._session.close()
            self._session = None

    async def __aenter__(self):
        """Async context manager enter method."""
        await self._ensure_session()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit method."""
        await self.close()

    async def get_rest_data(
        self,
        url: str,
        params: dict[str, Any] | None = None,
        data: dict[str, Any] | None = None,
        headers: dict[str, str] | None = None,
        method: str = "GET",
    ) -> Any:
        """Get data from REST API.

        Args:
            url: REST API URL
            params: Query parameters
            data: Request body data
            headers: Request headers
            method: HTTP method

        Returns:
            REST API response

        Raises:
            Exception: If the request fails
        """
        await self._ensure_session()
        assert self._session is not None

        self.logger.debug(f"Making REST request to {url} with params {params}")

        # Clean params, removing None values to avoid serialization issues
        cleaned_params = None
        if params is not None:
            cleaned_params = {k: v for k, v in params.items() if v is not None}

        # Clean data, removing None values
        cleaned_data = None
        if data is not None:
            cleaned_data = {k: v for k, v in data.items() if v is not None}

        try:
            async with self._session.request(
                method=method, url=url, params=cleaned_params, json=cleaned_data, headers=headers
            ) as response:
                response.raise_for_status()
                return await response.json()
        except Exception as e:
            self.logger.error(f"REST request failed: {e}")
            raise

    async def establish_ws_connection(self, url: str) -> WSAssistant:
        """Establish a websocket connection.

        Args:
            url: WebSocket URL

        Returns:
            WSAssistant instance

        Raises:
            Exception: If the connection fails
        """
        await self._ensure_session()
        assert self._session is not None

        self.logger.debug(f"Establishing WebSocket connection to {url}")

        try:
            ws = await self._session.ws_connect(url=url)
            return SimpleWSAssistant(ws, self.logger)
        except Exception as e:
            self.logger.error(f"WebSocket connection failed: {e}")
            raise

    async def send_ws_message(self, ws_assistant: WSAssistant, payload: dict[str, Any]) -> None:
        """Send a message over WebSocket.

        Args:
            ws_assistant: WebSocket assistant
            payload: Message payload

        Raises:
            Exception: If sending fails
        """
        try:
            await ws_assistant.send(payload)
        except Exception as e:
            self.logger.error(f"Failed to send WebSocket message: {e}")
            raise

__aenter__() async

Async context manager enter method.

Source code in candles_feed/core/network_client.py
46
47
48
49
async def __aenter__(self):
    """Async context manager enter method."""
    await self._ensure_session()
    return self

__aexit__(exc_type, exc_val, exc_tb) async

Async context manager exit method.

Source code in candles_feed/core/network_client.py
51
52
53
async def __aexit__(self, exc_type, exc_val, exc_tb):
    """Async context manager exit method."""
    await self.close()

__init__(logger=None)

Initialize the NetworkClient.

Parameters:

Name Type Description Default
logger Logger | None

Logger instance

None
Source code in candles_feed/core/network_client.py
22
23
24
25
26
27
28
29
def __init__(self, logger: Logger | None = None):
    """Initialize the NetworkClient.

    Args:
        logger: Logger instance
    """
    self.logger: Logger = logger or logging.getLogger(__name__)
    self._session: aiohttp.ClientSession | None = None

_ensure_session() async

Ensure aiohttp session exists.

Source code in candles_feed/core/network_client.py
31
32
33
34
async def _ensure_session(self):
    """Ensure aiohttp session exists."""
    if self._session is None or self._session.closed:
        self._session = aiohttp.ClientSession()

close() async

Close the client session.

Should be called when the network client is no longer needed to properly clean up resources.

Source code in candles_feed/core/network_client.py
36
37
38
39
40
41
42
43
44
async def close(self):
    """Close the client session.

    Should be called when the network client is no longer needed
    to properly clean up resources.
    """
    if self._session and not self._session.closed:
        await self._session.close()
        self._session = None

establish_ws_connection(url) async

Establish a websocket connection.

Parameters:

Name Type Description Default
url str

WebSocket URL

required

Returns:

Type Description
WSAssistant

WSAssistant instance

Raises:

Type Description
Exception

If the connection fails

Source code in candles_feed/core/network_client.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
async def establish_ws_connection(self, url: str) -> WSAssistant:
    """Establish a websocket connection.

    Args:
        url: WebSocket URL

    Returns:
        WSAssistant instance

    Raises:
        Exception: If the connection fails
    """
    await self._ensure_session()
    assert self._session is not None

    self.logger.debug(f"Establishing WebSocket connection to {url}")

    try:
        ws = await self._session.ws_connect(url=url)
        return SimpleWSAssistant(ws, self.logger)
    except Exception as e:
        self.logger.error(f"WebSocket connection failed: {e}")
        raise

get_rest_data(url, params=None, data=None, headers=None, method='GET') async

Get data from REST API.

Parameters:

Name Type Description Default
url str

REST API URL

required
params dict[str, Any] | None

Query parameters

None
data dict[str, Any] | None

Request body data

None
headers dict[str, str] | None

Request headers

None
method str

HTTP method

'GET'

Returns:

Type Description
Any

REST API response

Raises:

Type Description
Exception

If the request fails

Source code in candles_feed/core/network_client.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
async def get_rest_data(
    self,
    url: str,
    params: dict[str, Any] | None = None,
    data: dict[str, Any] | None = None,
    headers: dict[str, str] | None = None,
    method: str = "GET",
) -> Any:
    """Get data from REST API.

    Args:
        url: REST API URL
        params: Query parameters
        data: Request body data
        headers: Request headers
        method: HTTP method

    Returns:
        REST API response

    Raises:
        Exception: If the request fails
    """
    await self._ensure_session()
    assert self._session is not None

    self.logger.debug(f"Making REST request to {url} with params {params}")

    # Clean params, removing None values to avoid serialization issues
    cleaned_params = None
    if params is not None:
        cleaned_params = {k: v for k, v in params.items() if v is not None}

    # Clean data, removing None values
    cleaned_data = None
    if data is not None:
        cleaned_data = {k: v for k, v in data.items() if v is not None}

    try:
        async with self._session.request(
            method=method, url=url, params=cleaned_params, json=cleaned_data, headers=headers
        ) as response:
            response.raise_for_status()
            return await response.json()
    except Exception as e:
        self.logger.error(f"REST request failed: {e}")
        raise

send_ws_message(ws_assistant, payload) async

Send a message over WebSocket.

Parameters:

Name Type Description Default
ws_assistant WSAssistant

WebSocket assistant

required
payload dict[str, Any]

Message payload

required

Raises:

Type Description
Exception

If sending fails

Source code in candles_feed/core/network_client.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
async def send_ws_message(self, ws_assistant: WSAssistant, payload: dict[str, Any]) -> None:
    """Send a message over WebSocket.

    Args:
        ws_assistant: WebSocket assistant
        payload: Message payload

    Raises:
        Exception: If sending fails
    """
    try:
        await ws_assistant.send(payload)
    except Exception as e:
        self.logger.error(f"Failed to send WebSocket message: {e}")
        raise

NetworkStrategies

WebSocketStrategy

Implementation for websocket-based candle retrieval.

Source code in candles_feed/core/network_strategies.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
class WebSocketStrategy:
    """Implementation for websocket-based candle retrieval."""

    def __init__(
        self,
        network_client: NetworkClient,
        adapter: CandleDataAdapter,
        trading_pair: str,
        interval: str,
        data_processor: DataProcessor,
        candles_store: Deque[CandleData],
        logger: Logger | None = None,
    ):
        """Initialize the WebSocketStrategy.

        Args:
            network_client: Network client for API communication
            adapter: Exchange adapter
            trading_pair: Trading pair
            interval: Candle interval
            data_processor: Data processor
            candles_store: Deque for storing candles
            logger: Logger instance
        """
        self.network_client = network_client
        self.adapter = adapter
        self.trading_pair = trading_pair
        self.interval = interval
        self.data_processor = data_processor
        self._candles = candles_store
        self.logger = logger or logging.getLogger(__name__)
        self._ws_assistant: WSAssistant | None = None
        self._listen_task: asyncio.Task | None = None
        self._running = False
        self._ready_event = asyncio.Event()

    async def start(self) -> None:
        """Start listening for websocket updates."""
        if not self._running:
            self._running = True
            self._listen_task = asyncio.create_task(self._listen_for_updates())

    async def stop(self) -> None:
        """Stop listening for websocket updates."""
        self._running = False

        if self._listen_task:
            self._listen_task.cancel()
            self._listen_task = None

        if self._ws_assistant:
            await self._ws_assistant.disconnect()
            self._ws_assistant = None

    async def poll_once(
        self,
        start_time: int | None = None,
        end_time: int | None = None,
        limit: int | None = None,
    ) -> list[CandleData]:
        """
        Fetch candles for a specific time range (one-time poll).

        For WebSocket strategy, this falls back to REST API for historical data.

        Args:
            start_time: Start time in seconds
            end_time: End time in seconds
            limit: Maximum number of candles to return

        Returns:
            List of CandleData objects
        """
        # For historical data, we need to use REST API
        url = self.adapter.get_rest_url()
        params = self.adapter.get_rest_params(
            trading_pair=self.trading_pair,
            interval=self.interval,
            start_time=start_time,
            end_time=end_time,
            limit=limit,
        )

        try:
            response = await self.network_client.get_rest_data(url=url, params=params)

            candles = self.adapter.parse_rest_response(response)
            interval_seconds = self.adapter.get_supported_intervals()[self.interval]

            processed_candles = self.data_processor.sanitize_candles(candles, interval_seconds)

            return processed_candles

        except Exception as e:
            self.logger.error(f"Error fetching candles via REST: {e}")
            return []

    async def _listen_for_updates(self) -> None:
        """Listen for websocket updates."""
        # If we have no initial data, fetch it via REST API
        if not self._candles:
            await self._initialize_candles()
        else:
            self._ready_event.set()

        while self._running:
            try:
                ws_url = self.adapter.get_ws_url()
                self._ws_assistant = await self.network_client.establish_ws_connection(ws_url)

                # Subscribe to candle updates
                payload = self.adapter.get_ws_subscription_payload(self.trading_pair, self.interval)
                await self.network_client.send_ws_message(self._ws_assistant, payload)

                # Process incoming messages
                async for message in self._ws_assistant.iter_messages():
                    if not self._running:
                        break

                    candles = self.adapter.parse_ws_message(message)
                    if candles:
                        interval_seconds = self.adapter.get_supported_intervals()[self.interval]
                        validated_candles = self.data_processor.sanitize_candles(
                            candles, interval_seconds
                        )
                        self._update_candles(validated_candles)

            except asyncio.CancelledError:
                raise
            except Exception as e:
                self.logger.exception(f"Error in websocket connection: {e}")
                # If we have a connection, try to disconnect
                if self._ws_assistant:
                    try:
                        await self._ws_assistant.disconnect()
                    except Exception:  # Use explicit Exception instead of bare except
                        pass
                    finally:
                        self._ws_assistant = None

                if self._running:
                    await asyncio.sleep(1.0)

    async def _initialize_candles(self) -> None:
        """Initialize candles using REST API."""
        try:
            # Get enough candles to fill the store
            limit = self._candles.maxlen
            candles = await self.poll_once(limit=limit)
            if candles:
                for candle in candles:
                    self._candles.append(candle)
                self._ready_event.set()
            else:
                self.logger.warning("Failed to initialize candles, will retry")
        except Exception as e:
            self.logger.error(f"Error initializing candles: {e}")

    def _update_candles(self, new_candles: list[CandleData]) -> None:
        """Update the candles store with new data.

        Args:
            new_candles: New candles to update
        """
        if not new_candles:
            return

        # Either update existing candle or append new one
        for candle in new_candles:
            self.data_processor.process_candle(candle, self._candles)

__init__(network_client, adapter, trading_pair, interval, data_processor, candles_store, logger=None)

Initialize the WebSocketStrategy.

Parameters:

Name Type Description Default
network_client NetworkClient

Network client for API communication

required
adapter CandleDataAdapter

Exchange adapter

required
trading_pair str

Trading pair

required
interval str

Candle interval

required
data_processor DataProcessor

Data processor

required
candles_store Deque[CandleData]

Deque for storing candles

required
logger Logger | None

Logger instance

None
Source code in candles_feed/core/network_strategies.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(
    self,
    network_client: NetworkClient,
    adapter: CandleDataAdapter,
    trading_pair: str,
    interval: str,
    data_processor: DataProcessor,
    candles_store: Deque[CandleData],
    logger: Logger | None = None,
):
    """Initialize the WebSocketStrategy.

    Args:
        network_client: Network client for API communication
        adapter: Exchange adapter
        trading_pair: Trading pair
        interval: Candle interval
        data_processor: Data processor
        candles_store: Deque for storing candles
        logger: Logger instance
    """
    self.network_client = network_client
    self.adapter = adapter
    self.trading_pair = trading_pair
    self.interval = interval
    self.data_processor = data_processor
    self._candles = candles_store
    self.logger = logger or logging.getLogger(__name__)
    self._ws_assistant: WSAssistant | None = None
    self._listen_task: asyncio.Task | None = None
    self._running = False
    self._ready_event = asyncio.Event()

_initialize_candles() async

Initialize candles using REST API.

Source code in candles_feed/core/network_strategies.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
async def _initialize_candles(self) -> None:
    """Initialize candles using REST API."""
    try:
        # Get enough candles to fill the store
        limit = self._candles.maxlen
        candles = await self.poll_once(limit=limit)
        if candles:
            for candle in candles:
                self._candles.append(candle)
            self._ready_event.set()
        else:
            self.logger.warning("Failed to initialize candles, will retry")
    except Exception as e:
        self.logger.error(f"Error initializing candles: {e}")

_listen_for_updates() async

Listen for websocket updates.

Source code in candles_feed/core/network_strategies.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
async def _listen_for_updates(self) -> None:
    """Listen for websocket updates."""
    # If we have no initial data, fetch it via REST API
    if not self._candles:
        await self._initialize_candles()
    else:
        self._ready_event.set()

    while self._running:
        try:
            ws_url = self.adapter.get_ws_url()
            self._ws_assistant = await self.network_client.establish_ws_connection(ws_url)

            # Subscribe to candle updates
            payload = self.adapter.get_ws_subscription_payload(self.trading_pair, self.interval)
            await self.network_client.send_ws_message(self._ws_assistant, payload)

            # Process incoming messages
            async for message in self._ws_assistant.iter_messages():
                if not self._running:
                    break

                candles = self.adapter.parse_ws_message(message)
                if candles:
                    interval_seconds = self.adapter.get_supported_intervals()[self.interval]
                    validated_candles = self.data_processor.sanitize_candles(
                        candles, interval_seconds
                    )
                    self._update_candles(validated_candles)

        except asyncio.CancelledError:
            raise
        except Exception as e:
            self.logger.exception(f"Error in websocket connection: {e}")
            # If we have a connection, try to disconnect
            if self._ws_assistant:
                try:
                    await self._ws_assistant.disconnect()
                except Exception:  # Use explicit Exception instead of bare except
                    pass
                finally:
                    self._ws_assistant = None

            if self._running:
                await asyncio.sleep(1.0)

_update_candles(new_candles)

Update the candles store with new data.

Parameters:

Name Type Description Default
new_candles list[CandleData]

New candles to update

required
Source code in candles_feed/core/network_strategies.py
178
179
180
181
182
183
184
185
186
187
188
189
def _update_candles(self, new_candles: list[CandleData]) -> None:
    """Update the candles store with new data.

    Args:
        new_candles: New candles to update
    """
    if not new_candles:
        return

    # Either update existing candle or append new one
    for candle in new_candles:
        self.data_processor.process_candle(candle, self._candles)

poll_once(start_time=None, end_time=None, limit=None) async

Fetch candles for a specific time range (one-time poll).

For WebSocket strategy, this falls back to REST API for historical data.

Parameters:

Name Type Description Default
start_time int | None

Start time in seconds

None
end_time int | None

End time in seconds

None
limit int | None

Maximum number of candles to return

None

Returns:

Type Description
list[CandleData]

List of CandleData objects

Source code in candles_feed/core/network_strategies.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
async def poll_once(
    self,
    start_time: int | None = None,
    end_time: int | None = None,
    limit: int | None = None,
) -> list[CandleData]:
    """
    Fetch candles for a specific time range (one-time poll).

    For WebSocket strategy, this falls back to REST API for historical data.

    Args:
        start_time: Start time in seconds
        end_time: End time in seconds
        limit: Maximum number of candles to return

    Returns:
        List of CandleData objects
    """
    # For historical data, we need to use REST API
    url = self.adapter.get_rest_url()
    params = self.adapter.get_rest_params(
        trading_pair=self.trading_pair,
        interval=self.interval,
        start_time=start_time,
        end_time=end_time,
        limit=limit,
    )

    try:
        response = await self.network_client.get_rest_data(url=url, params=params)

        candles = self.adapter.parse_rest_response(response)
        interval_seconds = self.adapter.get_supported_intervals()[self.interval]

        processed_candles = self.data_processor.sanitize_candles(candles, interval_seconds)

        return processed_candles

    except Exception as e:
        self.logger.error(f"Error fetching candles via REST: {e}")
        return []

start() async

Start listening for websocket updates.

Source code in candles_feed/core/network_strategies.py
56
57
58
59
60
async def start(self) -> None:
    """Start listening for websocket updates."""
    if not self._running:
        self._running = True
        self._listen_task = asyncio.create_task(self._listen_for_updates())

stop() async

Stop listening for websocket updates.

Source code in candles_feed/core/network_strategies.py
62
63
64
65
66
67
68
69
70
71
72
async def stop(self) -> None:
    """Stop listening for websocket updates."""
    self._running = False

    if self._listen_task:
        self._listen_task.cancel()
        self._listen_task = None

    if self._ws_assistant:
        await self._ws_assistant.disconnect()
        self._ws_assistant = None

RESTPollingStrategy

Implementation for REST-based polling candle retrieval.

Source code in candles_feed/core/network_strategies.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
class RESTPollingStrategy:
    """Implementation for REST-based polling candle retrieval."""

    def __init__(
        self,
        network_client: NetworkClient,
        adapter: CandleDataAdapter,
        trading_pair: str,
        interval: str,
        data_processor: DataProcessor,
        candles_store: Deque[CandleData],
        logger: Logger | None = None,
    ):
        """Initialize the RESTPollingStrategy.

        Args:
            network_client: Network client for API communication
            adapter: Exchange adapter
            trading_pair: Trading pair
            interval: Candle interval
            data_processor: Data processor
            candles_store: Deque for storing candles
            logger: Logger instance
        """
        self.network_client = network_client
        self.adapter = adapter
        self.trading_pair = trading_pair
        self.interval = interval
        self.data_processor = data_processor
        self._candles = candles_store
        self.logger = logger or logging.getLogger(__name__)
        self._polling_task: asyncio.Task | None = None
        self._polling_interval = adapter.get_supported_intervals()[interval]
        self._running = False
        self._ready_event = asyncio.Event()

    async def start(self) -> None:
        """Start polling for updates."""
        if not self._running:
            self._running = True
            self._polling_task = asyncio.create_task(self._poll_for_updates())

    async def stop(self) -> None:
        """Stop polling for updates."""
        self._running = False
        if self._polling_task:
            self._polling_task.cancel()
            self._polling_task = None

        # Make sure we clean up any remaining network resources
        # The parent CandlesFeed class will close the network client

    async def poll_once(
        self,
        start_time: int | None = None,
        end_time: int | None = None,
        limit: int | None = None,
    ) -> list[CandleData]:
        """Fetch candles for a specific time range (one-time poll).

        Args:
            start_time: Start time in seconds
            end_time: End time in seconds
            limit: Maximum number of candles to return

        Returns:
            List of CandleData objects
        """
        try:
            # Adjust start/end time to align with intervals
            interval_seconds = self.adapter.get_supported_intervals()[self.interval]

            # Calculate proper parameters
            if end_time is None:
                end_time = int(time.time())

            # Round down to nearest interval
            end_time = end_time - (end_time % interval_seconds)

            if start_time is None and limit is not None:
                start_time = end_time - (limit * interval_seconds)
            elif start_time is not None:
                start_time = start_time - (start_time % interval_seconds)

            # Get REST parameters from adapter
            params = self.adapter.get_rest_params(
                trading_pair=self.trading_pair,
                interval=self.interval,
                start_time=start_time,
                end_time=end_time,
                limit=limit,
            )

            # Execute request
            url = self.adapter.get_rest_url()
            response = await self.network_client.get_rest_data(url=url, params=params)

            # Parse and process response
            candles = self.adapter.parse_rest_response(response)
            return self.data_processor.sanitize_candles(candles, interval_seconds)

        except Exception as e:
            self.logger.exception(f"Error fetching candles: {e}")
            return []

    async def _poll_for_updates(self) -> None:
        """Poll for updates at regular intervals."""
        # Initial fetch to populate the store
        if not self._candles:
            initial_candles = await self.poll_once(limit=self._candles.maxlen)
            if initial_candles:
                # Add all candles to the store
                for candle in initial_candles:
                    self._candles.append(candle)
                self._ready_event.set()
        else:
            self._ready_event.set()

        while self._running:
            try:
                # Calculate parameters for incremental updates
                if self._candles:
                    # Get data since the last candle, excluding the last one
                    # which might be incomplete
                    if len(self._candles) > 1:
                        # If we have more than one candle, we can safely slice
                        candles_without_last = list(self._candles)[:-1]
                        last_complete_ts = max(c.timestamp for c in candles_without_last)
                    else:
                        # If we have only one candle, use its timestamp as starting point
                        last_complete_ts = self._candles[0].timestamp

                    # Fetch new or updated candles
                    candles = await self.poll_once(start_time=last_complete_ts)
                    self._update_candles(candles)

                # Sleep until next interval
                sleep_time = max(
                    self._polling_interval / 2, 1
                )  # At least 1s, but prefer half interval
                await asyncio.sleep(sleep_time)

            except asyncio.CancelledError:
                break
            except Exception as e:
                self.logger.exception(f"Error in polling loop: {e}")
                await asyncio.sleep(1.0)

    def _update_candles(self, new_candles: list[CandleData]) -> None:
        """Update the candles store with new data.

        Args:
            new_candles: New candles to update
        """
        if not new_candles:
            return

        for candle in new_candles:
            self.data_processor.process_candle(candle, self._candles)

__init__(network_client, adapter, trading_pair, interval, data_processor, candles_store, logger=None)

Initialize the RESTPollingStrategy.

Parameters:

Name Type Description Default
network_client NetworkClient

Network client for API communication

required
adapter CandleDataAdapter

Exchange adapter

required
trading_pair str

Trading pair

required
interval str

Candle interval

required
data_processor DataProcessor

Data processor

required
candles_store Deque[CandleData]

Deque for storing candles

required
logger Logger | None

Logger instance

None
Source code in candles_feed/core/network_strategies.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def __init__(
    self,
    network_client: NetworkClient,
    adapter: CandleDataAdapter,
    trading_pair: str,
    interval: str,
    data_processor: DataProcessor,
    candles_store: Deque[CandleData],
    logger: Logger | None = None,
):
    """Initialize the RESTPollingStrategy.

    Args:
        network_client: Network client for API communication
        adapter: Exchange adapter
        trading_pair: Trading pair
        interval: Candle interval
        data_processor: Data processor
        candles_store: Deque for storing candles
        logger: Logger instance
    """
    self.network_client = network_client
    self.adapter = adapter
    self.trading_pair = trading_pair
    self.interval = interval
    self.data_processor = data_processor
    self._candles = candles_store
    self.logger = logger or logging.getLogger(__name__)
    self._polling_task: asyncio.Task | None = None
    self._polling_interval = adapter.get_supported_intervals()[interval]
    self._running = False
    self._ready_event = asyncio.Event()

_poll_for_updates() async

Poll for updates at regular intervals.

Source code in candles_feed/core/network_strategies.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
async def _poll_for_updates(self) -> None:
    """Poll for updates at regular intervals."""
    # Initial fetch to populate the store
    if not self._candles:
        initial_candles = await self.poll_once(limit=self._candles.maxlen)
        if initial_candles:
            # Add all candles to the store
            for candle in initial_candles:
                self._candles.append(candle)
            self._ready_event.set()
    else:
        self._ready_event.set()

    while self._running:
        try:
            # Calculate parameters for incremental updates
            if self._candles:
                # Get data since the last candle, excluding the last one
                # which might be incomplete
                if len(self._candles) > 1:
                    # If we have more than one candle, we can safely slice
                    candles_without_last = list(self._candles)[:-1]
                    last_complete_ts = max(c.timestamp for c in candles_without_last)
                else:
                    # If we have only one candle, use its timestamp as starting point
                    last_complete_ts = self._candles[0].timestamp

                # Fetch new or updated candles
                candles = await self.poll_once(start_time=last_complete_ts)
                self._update_candles(candles)

            # Sleep until next interval
            sleep_time = max(
                self._polling_interval / 2, 1
            )  # At least 1s, but prefer half interval
            await asyncio.sleep(sleep_time)

        except asyncio.CancelledError:
            break
        except Exception as e:
            self.logger.exception(f"Error in polling loop: {e}")
            await asyncio.sleep(1.0)

_update_candles(new_candles)

Update the candles store with new data.

Parameters:

Name Type Description Default
new_candles list[CandleData]

New candles to update

required
Source code in candles_feed/core/network_strategies.py
340
341
342
343
344
345
346
347
348
349
350
def _update_candles(self, new_candles: list[CandleData]) -> None:
    """Update the candles store with new data.

    Args:
        new_candles: New candles to update
    """
    if not new_candles:
        return

    for candle in new_candles:
        self.data_processor.process_candle(candle, self._candles)

poll_once(start_time=None, end_time=None, limit=None) async

Fetch candles for a specific time range (one-time poll).

Parameters:

Name Type Description Default
start_time int | None

Start time in seconds

None
end_time int | None

End time in seconds

None
limit int | None

Maximum number of candles to return

None

Returns:

Type Description
list[CandleData]

List of CandleData objects

Source code in candles_feed/core/network_strategies.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
async def poll_once(
    self,
    start_time: int | None = None,
    end_time: int | None = None,
    limit: int | None = None,
) -> list[CandleData]:
    """Fetch candles for a specific time range (one-time poll).

    Args:
        start_time: Start time in seconds
        end_time: End time in seconds
        limit: Maximum number of candles to return

    Returns:
        List of CandleData objects
    """
    try:
        # Adjust start/end time to align with intervals
        interval_seconds = self.adapter.get_supported_intervals()[self.interval]

        # Calculate proper parameters
        if end_time is None:
            end_time = int(time.time())

        # Round down to nearest interval
        end_time = end_time - (end_time % interval_seconds)

        if start_time is None and limit is not None:
            start_time = end_time - (limit * interval_seconds)
        elif start_time is not None:
            start_time = start_time - (start_time % interval_seconds)

        # Get REST parameters from adapter
        params = self.adapter.get_rest_params(
            trading_pair=self.trading_pair,
            interval=self.interval,
            start_time=start_time,
            end_time=end_time,
            limit=limit,
        )

        # Execute request
        url = self.adapter.get_rest_url()
        response = await self.network_client.get_rest_data(url=url, params=params)

        # Parse and process response
        candles = self.adapter.parse_rest_response(response)
        return self.data_processor.sanitize_candles(candles, interval_seconds)

    except Exception as e:
        self.logger.exception(f"Error fetching candles: {e}")
        return []

start() async

Start polling for updates.

Source code in candles_feed/core/network_strategies.py
228
229
230
231
232
async def start(self) -> None:
    """Start polling for updates."""
    if not self._running:
        self._running = True
        self._polling_task = asyncio.create_task(self._poll_for_updates())

stop() async

Stop polling for updates.

Source code in candles_feed/core/network_strategies.py
234
235
236
237
238
239
async def stop(self) -> None:
    """Stop polling for updates."""
    self._running = False
    if self._polling_task:
        self._polling_task.cancel()
        self._polling_task = None

Protocols

Bases: Protocol

Protocol for exchange adapters.

This protocol defines the interface that all exchange adapters must implement.

Source code in candles_feed/core/protocols.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
@runtime_checkable
class CandleDataAdapter(Protocol):
    """Protocol for exchange adapters.

    This protocol defines the interface that all exchange adapters must implement.
    """

    def get_trading_pair_format(self, trading_pair: str) -> str:
        """Convert standard trading pair format to exchange format.

        Args:
            trading_pair: Trading pair in standard format (e.g., "BTC-USDT")

        Returns:
            Trading pair in exchange-specific format
        """
        ...

    def get_rest_url(self) -> str:
        """Get REST API URL for candles.

        Returns:
            REST API URL
        """
        ...

    def get_ws_url(self) -> str:
        """Get WebSocket URL.

        Returns:
            WebSocket URL
        """
        ...

    def get_rest_params(
        self,
        trading_pair: str,
        interval: str,
        start_time: int | None = None,
        end_time: int | None = None,
        limit: int | None = None,
    ) -> dict:
        """Get parameters for REST API request.

        Args:
            trading_pair: Trading pair
            interval: Candle interval
            start_time: Start time in seconds
            end_time: End time in seconds
            limit: Maximum number of candles to return

        Returns:
            Dictionary of parameters for REST API request
        """
        ...

    def parse_rest_response(self, data: Any) -> list[CandleData]:
        """Parse REST API response into CandleData objects.

        Args:
            data: REST API response

        Returns:
            List of CandleData objects
        """
        ...

    def get_ws_subscription_payload(self, trading_pair: str, interval: str) -> dict:
        """Get WebSocket subscription payload.

        Args:
            trading_pair: Trading pair
            interval: Candle interval

        Returns:
            WebSocket subscription payload
        """
        ...

    def parse_ws_message(self, data: Any) -> list[CandleData] | None:
        """Parse WebSocket message into CandleData objects.

        Args:
            data: WebSocket message

        Returns:
            List of CandleData objects or None if message is not a candle update
        """
        ...

    def get_supported_intervals(self) -> dict[str, int]:
        """Get supported intervals and their durations in seconds.

        Returns:
            Dictionary mapping interval strings to their duration in seconds
        """
        ...

    def get_ws_supported_intervals(self) -> list[str]:
        """Get intervals supported by WebSocket API.

        Returns:
            List of interval strings supported by WebSocket API
        """
        ...

get_rest_params(trading_pair, interval, start_time=None, end_time=None, limit=None)

Get parameters for REST API request.

Parameters:

Name Type Description Default
trading_pair str

Trading pair

required
interval str

Candle interval

required
start_time int | None

Start time in seconds

None
end_time int | None

End time in seconds

None
limit int | None

Maximum number of candles to return

None

Returns:

Type Description
dict

Dictionary of parameters for REST API request

Source code in candles_feed/core/protocols.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def get_rest_params(
    self,
    trading_pair: str,
    interval: str,
    start_time: int | None = None,
    end_time: int | None = None,
    limit: int | None = None,
) -> dict:
    """Get parameters for REST API request.

    Args:
        trading_pair: Trading pair
        interval: Candle interval
        start_time: Start time in seconds
        end_time: End time in seconds
        limit: Maximum number of candles to return

    Returns:
        Dictionary of parameters for REST API request
    """
    ...

get_rest_url()

Get REST API URL for candles.

Returns:

Type Description
str

REST API URL

Source code in candles_feed/core/protocols.py
32
33
34
35
36
37
38
def get_rest_url(self) -> str:
    """Get REST API URL for candles.

    Returns:
        REST API URL
    """
    ...

get_supported_intervals()

Get supported intervals and their durations in seconds.

Returns:

Type Description
dict[str, int]

Dictionary mapping interval strings to their duration in seconds

Source code in candles_feed/core/protocols.py
104
105
106
107
108
109
110
def get_supported_intervals(self) -> dict[str, int]:
    """Get supported intervals and their durations in seconds.

    Returns:
        Dictionary mapping interval strings to their duration in seconds
    """
    ...

get_trading_pair_format(trading_pair)

Convert standard trading pair format to exchange format.

Parameters:

Name Type Description Default
trading_pair str

Trading pair in standard format (e.g., "BTC-USDT")

required

Returns:

Type Description
str

Trading pair in exchange-specific format

Source code in candles_feed/core/protocols.py
21
22
23
24
25
26
27
28
29
30
def get_trading_pair_format(self, trading_pair: str) -> str:
    """Convert standard trading pair format to exchange format.

    Args:
        trading_pair: Trading pair in standard format (e.g., "BTC-USDT")

    Returns:
        Trading pair in exchange-specific format
    """
    ...

get_ws_subscription_payload(trading_pair, interval)

Get WebSocket subscription payload.

Parameters:

Name Type Description Default
trading_pair str

Trading pair

required
interval str

Candle interval

required

Returns:

Type Description
dict

WebSocket subscription payload

Source code in candles_feed/core/protocols.py
81
82
83
84
85
86
87
88
89
90
91
def get_ws_subscription_payload(self, trading_pair: str, interval: str) -> dict:
    """Get WebSocket subscription payload.

    Args:
        trading_pair: Trading pair
        interval: Candle interval

    Returns:
        WebSocket subscription payload
    """
    ...

get_ws_supported_intervals()

Get intervals supported by WebSocket API.

Returns:

Type Description
list[str]

List of interval strings supported by WebSocket API

Source code in candles_feed/core/protocols.py
112
113
114
115
116
117
118
def get_ws_supported_intervals(self) -> list[str]:
    """Get intervals supported by WebSocket API.

    Returns:
        List of interval strings supported by WebSocket API
    """
    ...

get_ws_url()

Get WebSocket URL.

Returns:

Type Description
str

WebSocket URL

Source code in candles_feed/core/protocols.py
40
41
42
43
44
45
46
def get_ws_url(self) -> str:
    """Get WebSocket URL.

    Returns:
        WebSocket URL
    """
    ...

parse_rest_response(data)

Parse REST API response into CandleData objects.

Parameters:

Name Type Description Default
data Any

REST API response

required

Returns:

Type Description
list[CandleData]

List of CandleData objects

Source code in candles_feed/core/protocols.py
70
71
72
73
74
75
76
77
78
79
def parse_rest_response(self, data: Any) -> list[CandleData]:
    """Parse REST API response into CandleData objects.

    Args:
        data: REST API response

    Returns:
        List of CandleData objects
    """
    ...

parse_ws_message(data)

Parse WebSocket message into CandleData objects.

Parameters:

Name Type Description Default
data Any

WebSocket message

required

Returns:

Type Description
list[CandleData] | None

List of CandleData objects or None if message is not a candle update

Source code in candles_feed/core/protocols.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
def parse_ws_message(self, data: Any) -> list[CandleData] | None:
    """Parse WebSocket message into CandleData objects.

    Args:
        data: WebSocket message

    Returns:
        List of CandleData objects or None if message is not a candle update
    """
    ...

Bases: Protocol

Protocol for network connection strategies.

Source code in candles_feed/core/protocols.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
@runtime_checkable
class NetworkStrategy(Protocol):
    """Protocol for network connection strategies."""

    async def poll_once(
        self,
        start_time: int | None = None,
        end_time: int | None = None,
        limit: int | None = None,
    ) -> list[CandleData]:
        """Fetch candles for a specific time range.

        Args:
            start_time: Start time in seconds
            end_time: End time in seconds
            limit: Maximum number of candles to return

        Returns:
            List of CandleData objects
        """
        ...

    async def start(self) -> None:
        """Start the network strategy."""
        ...

    async def stop(self) -> None:
        """Stop the network strategy."""
        ...

poll_once(start_time=None, end_time=None, limit=None) async

Fetch candles for a specific time range.

Parameters:

Name Type Description Default
start_time int | None

Start time in seconds

None
end_time int | None

End time in seconds

None
limit int | None

Maximum number of candles to return

None

Returns:

Type Description
list[CandleData]

List of CandleData objects

Source code in candles_feed/core/protocols.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
async def poll_once(
    self,
    start_time: int | None = None,
    end_time: int | None = None,
    limit: int | None = None,
) -> list[CandleData]:
    """Fetch candles for a specific time range.

    Args:
        start_time: Start time in seconds
        end_time: End time in seconds
        limit: Maximum number of candles to return

    Returns:
        List of CandleData objects
    """
    ...

start() async

Start the network strategy.

Source code in candles_feed/core/protocols.py
172
173
174
async def start(self) -> None:
    """Start the network strategy."""
    ...

stop() async

Stop the network strategy.

Source code in candles_feed/core/protocols.py
176
177
178
async def stop(self) -> None:
    """Stop the network strategy."""
    ...

Bases: Protocol

Protocol for loggers.

Source code in candles_feed/core/protocols.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
class Logger(Protocol):
    """Protocol for loggers."""

    def debug(self, msg: str, *args, **kwargs) -> None:
        """Log debug message."""
        ...

    def info(self, msg: str, *args, **kwargs) -> None:
        """Log info message."""
        ...

    def warning(self, msg: str, *args, **kwargs) -> None:
        """Log warning message."""
        ...

    def error(self, msg: str, *args, **kwargs) -> None:
        """Log error message."""
        ...

    def exception(self, msg: str, *args, **kwargs) -> None:
        """Log exception message."""
        ...

debug(msg, *args, **kwargs)

Log debug message.

Source code in candles_feed/core/protocols.py
184
185
186
def debug(self, msg: str, *args, **kwargs) -> None:
    """Log debug message."""
    ...

error(msg, *args, **kwargs)

Log error message.

Source code in candles_feed/core/protocols.py
196
197
198
def error(self, msg: str, *args, **kwargs) -> None:
    """Log error message."""
    ...

exception(msg, *args, **kwargs)

Log exception message.

Source code in candles_feed/core/protocols.py
200
201
202
def exception(self, msg: str, *args, **kwargs) -> None:
    """Log exception message."""
    ...

info(msg, *args, **kwargs)

Log info message.

Source code in candles_feed/core/protocols.py
188
189
190
def info(self, msg: str, *args, **kwargs) -> None:
    """Log info message."""
    ...

warning(msg, *args, **kwargs)

Log warning message.

Source code in candles_feed/core/protocols.py
192
193
194
def warning(self, msg: str, *args, **kwargs) -> None:
    """Log warning message."""
    ...