1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
from decimal import Decimal
from itertools import groupby
import logging
from typing import Dict, List
from exceptions import MismatchedTradeError
from ledger_action import LedgerAction
from trade import Trade
from trade_queue import FIFOQueue
logger = logging.getLogger(__name__)
class LedgerProcess:
def __init__(self):
# Separate FIFO queue per cryptocurrency
self.fifo_queues: Dict[str, FIFOQueue] = {}
self.external_wallet: Dict[str, FIFOQueue] = {}
def process_ledger(self, read_actions: List[LedgerAction]):
for currency in {ac.asset for ac in read_actions}:
self.external_wallet.setdefault(currency, FIFOQueue())
self.fifo_queues.setdefault(currency, FIFOQueue())
# --- Trades
# don't make any assumtptions about ledger sorting
# groupby requires sorted inputs
trades_sorted = sorted(
(ac for ac in read_actions if ac.type == "trade"),
key=lambda a: (a.refid, a.timestamp),
)
grouped_actions = groupby(trades_sorted, lambda a: a.refid)
for refid, actions in grouped_actions:
actions = list(actions)
self._process_trade(refid, actions)
# --- Deposit/Withdrawal
for action in sorted(
(
ac
for ac in read_actions
if ac.type in ["deposit", "withdrawal"] and ac.asset != "EUR"
),
key=lambda a: a.timestamp,
):
if action.type == "deposit":
self._process_deposit(action)
elif action.type == "withdrawal":
self._process_withdrawal(action)
def _process_deposit(self, action: LedgerAction):
assert action.amount > 0
assert action.fee >= 0
currency = action.asset
t = self.external_wallet[currency].remove(
lambda t: t.amount == action.amount and t.timestamp < action.timestamp
)
# Fee handling: Kraken shows "full" amount first, I have to subtract the fee.
wallet = self.fifo_queues[currency]
t.remove_coins(action.fee)
wallet.add_trade(t)
def _process_withdrawal(self, action: LedgerAction):
assert action.amount < 0
assert action.fee >= 0
currency = action.asset
withdraw_total = -action.amount + action.fee
t = self.fifo_queues[currency].remove(
lambda t: t.amount == withdraw_total and t.timestamp < action.timestamp
)
t.remove_coins(action.fee)
self.external_wallet[currency].add_trade(t)
def _process_trade(self, refid: str, trades: List[LedgerAction]):
if len(trades) == 2: # Ensure we have two related rows (EUR + crypto)
eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None)
crypto_trade = next(
(trade for trade in trades if trade.asset != "EUR"), None
)
if eur_trade and crypto_trade:
crypto_asset = crypto_trade.asset
eur_amount = Decimal(eur_trade.amount)
eur_fee = Decimal(eur_trade.fee)
crypto_amount = Decimal(crypto_trade.amount)
crypto_fee = Decimal(crypto_trade.fee)
date_sold = eur_trade.date
if eur_amount < 0: # Purchase of cryptocurrency
assert crypto_amount > 0
assert eur_fee >= 0
assert crypto_fee >= 0
cost = -eur_amount - eur_fee # Account for EUR fees
crypto_amount -= crypto_fee # Adjust for crypto fees
self.fifo_queues[crypto_asset].add_trade(
Trade(crypto_amount, cost, date_sold, refid=refid)
)
elif eur_amount > 0: # Sale of cryptocurrency
# simply add to queue, don't process it yet
assert crypto_amount < 0
assert eur_fee >= 0
assert crypto_fee >= 0
proceeds = eur_amount - eur_fee
crypto_amount += crypto_fee # Adjust for crypto fees
self.fifo_queues[crypto_asset].add_trade(
Trade(crypto_amount, -proceeds, date_sold, refid=refid)
)
else:
logger.error(f"Trade group doesn't have expected currencies.")
raise MismatchedTradeError(
f"Unexpected trade grouping for refid {refid}"
)
else:
logger.error(f"Trade group has {len(trades)} trades, expected 2.")
raise MismatchedTradeError(f"Unexpected number of trades for refid {refid}")
|