1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
from decimal import Decimal
from itertools import groupby
import logging
from typing import Dict, List
from exceptions import MismatchedTradeError
from ledger_action import LedgerAction
from trade import Trade
from trade_queue import FIFOQueue
logger = logging.getLogger(__name__)
class LedgerProcess:
def __init__(self):
# Separate FIFO queue per cryptocurrency
self.fifo_queues: Dict[str, FIFOQueue] = {}
self.external_wallet: Dict[str, FIFOQueue] = {}
def process_ledger(self, read_actions: List[LedgerAction]):
# don't make any assumtptions about ledger sorting
# groupby requires sorted inputs
actions_sorted = sorted(read_actions, key=lambda a: (a.refid, a.timestamp))
grouped_actions = groupby(actions_sorted, lambda a: a.refid)
process_after: List[LedgerAction] = []
for refid, actions in grouped_actions:
actions = list(actions)
if len(actions) == 0:
logger.error("actions is empty")
continue
action = actions[0]
# Group trades by refid
if action.type == "trade":
self._process_trade(refid, actions)
elif action.asset == "EUR":
continue
elif action.type == "deposit" or action.type == "withdrawal":
assert len(actions) == 1
process_after.append(action)
for action in sorted(process_after, key=lambda a: a.timestamp):
if action.type == "deposit" and action.asset != "EUR":
self._process_deposit(action)
elif action.type == "withdrawal" and action.asset != "EUR":
self._process_withdrawal(action)
def _process_deposit(self, action: LedgerAction):
assert action.amount > 0
assert action.fee >= 0
currency = action.asset
self.external_wallet.setdefault(currency, FIFOQueue())
self.fifo_queues.setdefault(currency, FIFOQueue())
t = self.external_wallet[currency].remove(
lambda t: t.amount == action.amount and t.timestamp < action.timestamp
)
# Fee handling: Kraken shows "full" amount first, I have to subtract the fee.
wallet = self.fifo_queues[currency]
t.remove_coins(action.fee)
wallet.add_trade(t)
# TODO: Sort
logger.error("Don't know how do handle deposits yet.")
def _process_withdrawal(self, action: LedgerAction):
assert action.amount < 0
assert action.fee >= 0
currency = action.asset
self.external_wallet.setdefault(currency, FIFOQueue())
self.fifo_queues.setdefault(currency, FIFOQueue())
withdraw_total = -action.amount + action.fee
t = self.fifo_queues[currency].remove(
lambda t: t.amount == withdraw_total and t.timestamp < action.timestamp
)
t.remove_coins(action.fee)
self.external_wallet[currency].add_trade(t)
def _process_trade(self, refid: str, trades: List[LedgerAction]):
if len(trades) == 2: # Ensure we have two related rows (EUR + crypto)
eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None)
crypto_trade = next(
(trade for trade in trades if trade.asset != "EUR"), None
)
if eur_trade and crypto_trade:
crypto_asset = crypto_trade.asset
eur_amount = Decimal(eur_trade.amount)
eur_fee = Decimal(eur_trade.fee)
crypto_amount = Decimal(crypto_trade.amount)
crypto_fee = Decimal(crypto_trade.fee)
self.fifo_queues.setdefault(crypto_asset, FIFOQueue())
date_sold = eur_trade.date
if eur_amount < 0: # Purchase of cryptocurrency
assert crypto_amount > 0
assert eur_fee >= 0
assert crypto_fee >= 0
cost = -eur_amount - eur_fee # Account for EUR fees
crypto_amount -= crypto_fee # Adjust for crypto fees
self.fifo_queues[crypto_asset].add_trade(
Trade(crypto_amount, cost, date_sold, refid=refid)
)
elif eur_amount > 0: # Sale of cryptocurrency
# simply add to queue, don't process it yet
assert crypto_amount < 0
assert eur_fee >= 0
assert crypto_fee >= 0
proceeds = eur_amount - eur_fee
crypto_amount += crypto_fee # Adjust for crypto fees
self.fifo_queues[crypto_asset].add_trade(
Trade(crypto_amount, -proceeds, date_sold, refid=refid)
)
else:
logger.error(f"Trade group doesn't have expected currencies.")
raise MismatchedTradeError(
f"Unexpected trade grouping for refid {refid}"
)
else:
logger.error(f"Trade group has {len(trades)} trades, expected 2.")
raise MismatchedTradeError(f"Unexpected number of trades for refid {refid}")
|