1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
from decimal import Decimal
from itertools import groupby
import logging
from typing import Dict, List
from exceptions import MismatchedTradeError
from ledger_action import LedgerAction
from trade import Trade
from trade_queue import FIFOQueue
logger = logging.getLogger(__name__)
class LedgerProcess:
def __init__(self):
# Separate FIFO queue per cryptocurrency
self.fifo_queues: Dict[str, FIFOQueue] = {}
self.external_wallet: Dict[str, FIFOQueue] = {}
def process_ledger(self, read_actions: List[LedgerAction]):
# don't make any assumtptions about ledger sorting
# groupby requires sorted inputs
actions_sorted = sorted(read_actions, key=lambda a: (a.refid, a.timestamp))
_grouped_actions = groupby(actions_sorted, lambda a: a.refid)
_list_tuple_actions = [(k, list(v)) for k, v in _grouped_actions]
# finally, sort groupy by first available timestamp
sorted_grouped_actions = sorted(
_list_tuple_actions, key=lambda a: a[1][0].timestamp
)
for refid, actions in sorted_grouped_actions:
actions = list(actions)
if len(actions) == 0:
logger.error("actions is empty")
continue
action = actions[0]
# Group trades by refid
if action.type == "trade":
self._process_trade(refid, actions)
elif action.type == "deposit" and action.asset != "EUR":
assert len(actions) == 1
self._process_deposit(action)
elif action.type == "withdrawal" and action.asset != "EUR":
assert len(actions) == 1
self._process_withdrawal(action)
def _process_deposit(self, action: LedgerAction):
assert action.amount > 0
assert action.fee >= 0
currency = action.asset
self.external_wallet.setdefault(currency, FIFOQueue())
self.fifo_queues.setdefault(currency, FIFOQueue())
t = self.external_wallet[currency].remove(lambda t: t.amount == action.amount)
# TODO: Fee handling
assert action.fee == 0
wallet = self.fifo_queues[currency]
wallet.add_trade(t)
# TODO: Sort
logger.error("Don't know how do handle deposits yet.")
def _process_withdrawal(self, action: LedgerAction):
assert action.amount < 0
assert action.fee >= 0
currency = action.asset
self.external_wallet.setdefault(currency, FIFOQueue())
self.fifo_queues.setdefault(currency, FIFOQueue())
t = self.fifo_queues[currency].remove(
lambda t: t.amount == -action.amount + action.fee
)
t.remove_coins(action.fee)
self.external_wallet[currency].add_trade(t)
def _process_trade(self, refid: str, trades: List[LedgerAction]):
if len(trades) == 2: # Ensure we have two related rows (EUR + crypto)
eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None)
crypto_trade = next(
(trade for trade in trades if trade.asset != "EUR"), None
)
if eur_trade and crypto_trade:
crypto_asset = crypto_trade.asset
eur_amount = Decimal(eur_trade.amount)
eur_fee = Decimal(eur_trade.fee)
crypto_amount = Decimal(crypto_trade.amount)
crypto_fee = Decimal(crypto_trade.fee)
self.fifo_queues.setdefault(crypto_asset, FIFOQueue())
date_sold = eur_trade.date
if eur_amount < 0: # Purchase of cryptocurrency
assert crypto_amount > 0
assert eur_fee >= 0
assert crypto_fee >= 0
cost = -eur_amount - eur_fee # Account for EUR fees
crypto_amount -= crypto_fee # Adjust for crypto fees
self.fifo_queues[crypto_asset].add_trade(
Trade(crypto_amount, cost, date_sold, refid=refid)
)
elif eur_amount > 0: # Sale of cryptocurrency
# simply add to queue, don't process it yet
assert crypto_amount < 0
assert eur_fee >= 0
assert crypto_fee >= 0
proceeds = eur_amount - eur_fee
crypto_amount += crypto_fee # Adjust for crypto fees
self.fifo_queues[crypto_asset].add_trade(
Trade(crypto_amount, -proceeds, date_sold, refid=refid)
)
else:
logger.error(f"Trade group doesn't have expected currencies.")
raise MismatchedTradeError(
f"Unexpected trade grouping for refid {refid}"
)
else:
logger.error(f"Trade group has {len(trades)} trades, expected 2.")
raise MismatchedTradeError(f"Unexpected number of trades for refid {refid}")
|