blob: 9cae56088b9230a9a2750cecce2aa82a9735360c (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
import logging
from collections import deque
from decimal import Decimal
from typing import Deque, List
from trade import Trade
# Set up a dedicated logger for FIFOQueue
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class FIFOQueue:
"""
Crypto trading FIFO queue.
Will track trades.
"""
def __init__(self) -> None:
self.queue: Deque[Trade] = deque()
self._cached_total: Decimal = Decimal(0)
self._cache_valid: bool = True
logger.info("FIFOQueue initialized with empty queue.")
def add(self, amount: float | Decimal, total_cost: float | Decimal, date: str) -> None:
"""
Add a trade to the queue.
"""
trade = Trade(amount, total_cost, date)
self.queue.append(trade)
self._cache_valid = False
logger.info(f"Added trade: {trade}.")
def remove_coins(self, amount: float | Decimal) -> List[Trade]:
"""
Remove a specified amount of coins from the queue, returning the
trades used to buy. This can be used to calculate profit/loss.
"""
if amount <= 0:
logger.error("Attempted to remove non-positive amount.")
raise ValueError("The amount to remove must be positive.")
amount = Decimal(amount)
if amount > self.get_remaining_amount():
logger.error(f"Insufficient assets to process sale of {amount}.")
raise ValueError(f"Insufficient assets in queue to process sale of {amount}.")
logger.debug(f"Removing {amount:.2f} coins from the queue.")
logger.info("Cache invalidated before removing coins.")
self._cache_valid = False
remaining: Decimal = amount
entries: List[Trade] = []
while remaining > 0:
trade = self.queue[0]
logger.debug(f"Processing trade: {trade}")
if trade.amount > remaining:
ppc = trade.price_per_coin
trade.remove_coins(remaining)
entries.append(Trade(remaining, remaining * ppc, trade.date))
logger.info(f"Partial removal from trade: {remaining:.2f} coins.")
break
else:
remaining -= trade.amount
entries.append(trade)
self.queue.popleft()
logger.info(f"Removed full trade: {trade}. Remaining coins to remove: {remaining}")
return entries
def get_remaining_amount(self) -> Decimal:
"""
Calculate the total remaining amount in the queue.
"""
if not self._cache_valid:
logger.debug("Cache invalid, recalculating remaining amount.")
self._cached_total = sum((trade.amount for trade in self.queue), Decimal(0))
self._cache_valid = True
logger.info(f"Cache recalculated: {self._cached_total:.2f}")
logger.debug(f"Returning cached remaining amount: {self._cached_total:.2f}")
return self._cached_total
|