from decimal import Decimal from itertools import groupby import logging from typing import Dict, List from ledger_action import LedgerAction from trade import Trade from trade_queue import FIFOQueue logger = logging.getLogger(__name__) class LedgerProcess: def __init__(self): # Separate FIFO queue per cryptocurrency self.fifo_queues: Dict[str, FIFOQueue] = {} self.external_wallet: Dict[str, FIFOQueue] = {} def process_ledger(self, read_actions: List[LedgerAction]): # don't make any assumtptions about ledger sorting # groupby requires sorted inputs actions_sorted = sorted(read_actions, key=lambda a: (a.refid, a.timestamp)) _grouped_actions = groupby(actions_sorted, lambda a: a.refid) _list_tuple_actions = [(k, list(v)) for k, v in _grouped_actions] # finally, sort groupy by first available timestamp sorted_grouped_actions = sorted( _list_tuple_actions, key=lambda a: a[1][0].timestamp ) for refid, actions in sorted_grouped_actions: actions = list(actions) if len(actions) == 0: logger.error("actions is empty") continue action = actions[0] # Group trades by refid if action.type == "trade": assert len(actions) == 2 self._process_trade(refid, actions) elif action.type == "deposit" and action.asset != "EUR": assert len(actions) == 1 logger.error("Don't know how do handle deposits yet.") # currency = action.asset # fifo_queues.setdefault(currency, FIFOQueue()) # amount = Decimal(action.amount) # price = 0 # current = fifo_queues[currency] # # remove transaction fees # # (but only if it has a previous withdraw, or rather, if these are coins from a previous purchase... # # but I can't check that...) # if len(current): # current.remove_coins(action.fee) # current.add(amount, price, action.date) elif action.type == "withdrawal" and action.asset != "EUR": assert len(actions) == 1 currency = action.asset self.external_wallet.setdefault(currency, FIFOQueue()) self.fifo_queues.setdefault(currency, FIFOQueue()) t = self.fifo_queues[currency].remove(lambda t: t.amount == -action.amount + action.fee) t.remove_coins(action.fee) self.external_wallet[currency].add_trade(t) def _process_trade(self, refid: str, trades: List[LedgerAction]): if len(trades) == 2: # Ensure we have two related rows (EUR + crypto) eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None) crypto_trade = next( (trade for trade in trades if trade.asset != "EUR"), None ) if eur_trade and crypto_trade: crypto_asset = crypto_trade.asset eur_amount = Decimal(eur_trade.amount) eur_fee = Decimal(eur_trade.fee) crypto_amount = Decimal(crypto_trade.amount) crypto_fee = Decimal(crypto_trade.fee) self.fifo_queues.setdefault(crypto_asset, FIFOQueue()) date_sold = eur_trade.date if eur_amount < 0: # Purchase of cryptocurrency stake_amount = -eur_amount - eur_fee # Account for EUR fees crypto_amount -= crypto_fee # Adjust for crypto fees self.fifo_queues[crypto_asset].add_trade( Trade(crypto_amount, stake_amount, date_sold, refid=refid) ) elif eur_amount > 0: # Sale of cryptocurrency proceeds = eur_amount - eur_fee self.fifo_queues[crypto_asset].add_trade( Trade(crypto_amount, -proceeds, date_sold, refid=refid) ) # sale_entries = fifo_queues[crypto_asset].remove_coins(-crypto_amount) # report.extend( # generate_report(sale_entries, proceeds, crypto_asset, date_sold) # ) else: raise ValueError(f"Unexpected trade grouping for refid {refid}") else: raise ValueError(f"Unexpected number of trades for refid {refid}")