from decimal import Decimal from itertools import groupby import logging from typing import Dict, List from exceptions import MismatchedTradeError from ledger_action import LedgerAction from trade import Trade from trade_queue import FIFOQueue logger = logging.getLogger(__name__) class LedgerProcess: def __init__(self): # Separate FIFO queue per cryptocurrency self.fifo_queues: Dict[str, FIFOQueue] = {} self.external_wallet: Dict[str, FIFOQueue] = {} def process_ledger(self, read_actions: List[LedgerAction]): # don't make any assumtptions about ledger sorting # groupby requires sorted inputs actions_sorted = sorted(read_actions, key=lambda a: (a.refid, a.timestamp)) grouped_actions = groupby(actions_sorted, lambda a: a.refid) process_after: List[LedgerAction] = [] for refid, actions in grouped_actions: actions = list(actions) if len(actions) == 0: logger.error("actions is empty") continue action = actions[0] # Group trades by refid if action.type == "trade": self._process_trade(refid, actions) elif action.asset == "EUR": continue elif action.type == "deposit" or action.type == "withdrawal": assert len(actions) == 1 process_after.append(action) for action in sorted(process_after, key=lambda a: a.timestamp): if action.type == "deposit" and action.asset != "EUR": self._process_deposit(action) elif action.type == "withdrawal" and action.asset != "EUR": self._process_withdrawal(action) def _process_deposit(self, action: LedgerAction): assert action.amount > 0 assert action.fee >= 0 currency = action.asset self.external_wallet.setdefault(currency, FIFOQueue()) self.fifo_queues.setdefault(currency, FIFOQueue()) t = self.external_wallet[currency].remove( lambda t: t.amount == action.amount and t.timestamp < action.timestamp ) # Fee handling: Kraken shows "full" amount first, I have to subtract the fee. wallet = self.fifo_queues[currency] t.remove_coins(action.fee) wallet.add_trade(t) # TODO: Sort logger.error("Don't know how do handle deposits yet.") def _process_withdrawal(self, action: LedgerAction): assert action.amount < 0 assert action.fee >= 0 currency = action.asset self.external_wallet.setdefault(currency, FIFOQueue()) self.fifo_queues.setdefault(currency, FIFOQueue()) withdraw_total = -action.amount + action.fee t = self.fifo_queues[currency].remove( lambda t: t.amount == withdraw_total and t.timestamp < action.timestamp ) t.remove_coins(action.fee) self.external_wallet[currency].add_trade(t) def _process_trade(self, refid: str, trades: List[LedgerAction]): if len(trades) == 2: # Ensure we have two related rows (EUR + crypto) eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None) crypto_trade = next( (trade for trade in trades if trade.asset != "EUR"), None ) if eur_trade and crypto_trade: crypto_asset = crypto_trade.asset eur_amount = Decimal(eur_trade.amount) eur_fee = Decimal(eur_trade.fee) crypto_amount = Decimal(crypto_trade.amount) crypto_fee = Decimal(crypto_trade.fee) self.fifo_queues.setdefault(crypto_asset, FIFOQueue()) date_sold = eur_trade.date if eur_amount < 0: # Purchase of cryptocurrency assert crypto_amount > 0 assert eur_fee >= 0 assert crypto_fee >= 0 cost = -eur_amount - eur_fee # Account for EUR fees crypto_amount -= crypto_fee # Adjust for crypto fees self.fifo_queues[crypto_asset].add_trade( Trade(crypto_amount, cost, date_sold, refid=refid) ) elif eur_amount > 0: # Sale of cryptocurrency # simply add to queue, don't process it yet assert crypto_amount < 0 assert eur_fee >= 0 assert crypto_fee >= 0 proceeds = eur_amount - eur_fee crypto_amount += crypto_fee # Adjust for crypto fees self.fifo_queues[crypto_asset].add_trade( Trade(crypto_amount, -proceeds, date_sold, refid=refid) ) else: logger.error(f"Trade group doesn't have expected currencies.") raise MismatchedTradeError( f"Unexpected trade grouping for refid {refid}" ) else: logger.error(f"Trade group has {len(trades)} trades, expected 2.") raise MismatchedTradeError(f"Unexpected number of trades for refid {refid}")