From ae8212598e4fc21caf587e5d58b7be2c1e98bb5d Mon Sep 17 00:00:00 2001 From: uvok Date: Fri, 18 Apr 2025 11:43:35 +0200 Subject: Extract LedgerProcess --- ledger_process.py | 105 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ pnlcalc.py | 89 ++++----------------------------------------- 2 files changed, 111 insertions(+), 83 deletions(-) create mode 100644 ledger_process.py diff --git a/ledger_process.py b/ledger_process.py new file mode 100644 index 0000000..7703e58 --- /dev/null +++ b/ledger_process.py @@ -0,0 +1,105 @@ +from decimal import Decimal +from itertools import groupby +import logging +from typing import Dict, List + +from ledger_action import LedgerAction +from trade import Trade +from trade_queue import FIFOQueue + +logger = logging.getLogger(__name__) + + +class LedgerProcess: + def __init__(self): + # Separate FIFO queue per cryptocurrency + self.fifo_queues: Dict[str, FIFOQueue] = {} + self.external_wallet: Dict[str, FIFOQueue] = {} + + def process_ledger(self, read_actions: List[LedgerAction]): + # don't make any assumtptions about ledger sorting + # groupby requires sorted inputs + actions_sorted = sorted(read_actions, key=lambda a: (a.refid, a.timestamp)) + _grouped_actions = groupby(actions_sorted, lambda a: a.refid) + _list_tuple_actions = [(k, list(v)) for k, v in _grouped_actions] + # finally, sort groupy by first available timestamp + sorted_grouped_actions = sorted( + _list_tuple_actions, key=lambda a: a[1][0].timestamp + ) + + for refid, actions in sorted_grouped_actions: + actions = list(actions) + + if len(actions) == 0: + logger.error("actions is empty") + continue + action = actions[0] + + # Group trades by refid + if action.type == "trade": + self._process_trade(refid, actions) + + elif action.type == "deposit" and action.asset != "EUR": + assert len(actions) == 1 + logger.error("Don't know how do handle deposits yet.") + # currency = action.asset + # fifo_queues.setdefault(currency, FIFOQueue()) + # amount = Decimal(action.amount) + # price = 0 + # current = fifo_queues[currency] + + # # remove transaction fees + # # (but only if it has a previous withdraw, or rather, if these are coins from a previous purchase... + # # but I can't check that...) + # if len(current): + # current.remove_coins(action.fee) + + # current.add(amount, price, action.date) + + elif action.type == "withdrawal" and action.asset != "EUR": + assert len(actions) == 1 + logger.error("Don't know how do handle withdrawals yet.") + # currency = action.asset + # fifo_queues.setdefault(currency, FIFOQueue()) + # amount = Decimal(action.amount) + # price = 0 # Deposits typically have no associated cost basis + # current = fifo_queues[currency] + + # current.add(amount, price, action.date) + + def _process_trade(self, refid: str, trades: List[LedgerAction]): + if len(trades) == 2: # Ensure we have two related rows (EUR + crypto) + eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None) + crypto_trade = next( + (trade for trade in trades if trade.asset != "EUR"), None + ) + + if eur_trade and crypto_trade: + crypto_asset = crypto_trade.asset + eur_amount = Decimal(eur_trade.amount) + eur_fee = Decimal(eur_trade.fee) + crypto_amount = Decimal(crypto_trade.amount) + crypto_fee = Decimal(crypto_trade.fee) + self.fifo_queues.setdefault(crypto_asset, FIFOQueue()) + + date_sold = eur_trade.date + + if eur_amount < 0: # Purchase of cryptocurrency + stake_amount = -eur_amount - eur_fee # Account for EUR fees + crypto_amount -= crypto_fee # Adjust for crypto fees + self.fifo_queues[crypto_asset].add_trade( + Trade(crypto_amount, stake_amount, date_sold, refid=refid) + ) + elif eur_amount > 0: # Sale of cryptocurrency + proceeds = eur_amount - eur_fee + self.fifo_queues[crypto_asset].add_trade( + Trade(crypto_amount, -proceeds, date_sold, refid=refid) + ) + # sale_entries = fifo_queues[crypto_asset].remove_coins(-crypto_amount) + # report.extend( + # generate_report(sale_entries, proceeds, crypto_asset, date_sold) + # ) + else: + raise ValueError(f"Unexpected trade grouping for refid {refid}") + else: + raise ValueError(f"Unexpected number of trades for refid {refid}") diff --git a/pnlcalc.py b/pnlcalc.py index 309a0b0..51152c2 100644 --- a/pnlcalc.py +++ b/pnlcalc.py @@ -7,6 +7,7 @@ from typing import Dict, List from kraken import read_ledger from ledger_action import LedgerAction +from ledger_process import LedgerProcess from trade import Trade from trade_queue import FIFOQueue @@ -54,91 +55,10 @@ def generate_report(sale_entries, proceeds: Decimal, crypto_asset, date_sold): return report -fifo_queues: Dict[str, FIFOQueue] = {} # Separate FIFO queue per cryptocurrency report = [] -def process_trade(refid: str, trades: List[LedgerAction]): - if len(trades) == 2: # Ensure we have two related rows (EUR + crypto) - eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None) - crypto_trade = next((trade for trade in trades if trade.asset != "EUR"), None) - - if eur_trade and crypto_trade: - crypto_asset = crypto_trade.asset - eur_amount = Decimal(eur_trade.amount) - eur_fee = Decimal(eur_trade.fee) - crypto_amount = Decimal(crypto_trade.amount) - crypto_fee = Decimal(crypto_trade.fee) - fifo_queues.setdefault(crypto_asset, FIFOQueue()) - - date_sold = eur_trade.date - - if eur_amount < 0: # Purchase of cryptocurrency - stake_amount = -eur_amount - eur_fee # Account for EUR fees - crypto_amount -= crypto_fee # Adjust for crypto fees - fifo_queues[crypto_asset].add_trade(Trade(crypto_amount, stake_amount, date_sold, refid=refid)) - elif eur_amount > 0: # Sale of cryptocurrency - proceeds = eur_amount - eur_fee - fifo_queues[crypto_asset].add_trade(Trade(crypto_amount, -proceeds, date_sold, refid=refid)) - # sale_entries = fifo_queues[crypto_asset].remove_coins(-crypto_amount) - # report.extend( - # generate_report(sale_entries, proceeds, crypto_asset, date_sold) - # ) - else: - raise ValueError(f"Unexpected trade grouping for refid {refid}") - else: - raise ValueError(f"Unexpected number of trades for refid {refid}") - - -def process_ledger(read_actions: List[LedgerAction], output_path: str): - # don't make any assumtptions about ledger sorting - # groupby requires sorted inputs - actions_sorted = sorted(read_actions, key=lambda a: (a.refid, a.timestamp)) - _grouped_actions = groupby(actions_sorted, lambda a: a.refid) - _list_tuple_actions = [(k, list(v)) for k, v in _grouped_actions] - # finally, sort groupy by first available timestamp - sorted_grouped_actions = sorted(_list_tuple_actions, key=lambda a: a[1][0].timestamp) - - for refid, actions in sorted_grouped_actions: - actions = list(actions) - - if len(actions) == 0: - logger.error("actions is empty") - continue - action = actions[0] - - # Group trades by refid - if action.type == "trade": - process_trade(refid, actions) - - elif action.type == "deposit" and action.asset != "EUR": - assert len(actions) == 1 - logger.error("Don't know how do handle deposits yet.") - # currency = action.asset - # fifo_queues.setdefault(currency, FIFOQueue()) - # amount = Decimal(action.amount) - # price = 0 - # current = fifo_queues[currency] - - # # remove transaction fees - # # (but only if it has a previous withdraw, or rather, if these are coins from a previous purchase... - # # but I can't check that...) - # if len(current): - # current.remove_coins(action.fee) - - # current.add(amount, price, action.date) - - elif action.type == "withdrawal" and action.asset != "EUR": - assert len(actions) == 1 - logger.error("Don't know how do handle withdrawals yet.") - # currency = action.asset - # fifo_queues.setdefault(currency, FIFOQueue()) - # amount = Decimal(action.amount) - # price = 0 # Deposits typically have no associated cost basis - # current = fifo_queues[currency] - - # current.add(amount, price, action.date) - +def write_report(output_path: str): # Write report to CSV with open(output_path, "w", newline="") as csvfile: fieldnames = [ @@ -162,4 +82,7 @@ def process_ledger(read_actions: List[LedgerAction], output_path: str): ledger_path = "ledgers.csv" # Replace with your ledger file path output_path = "tax_report.csv" # Replace with your desired output file path actions = read_ledger(ledger_path) -process_ledger(actions, output_path) +lp = LedgerProcess() +lp.process_ledger(actions) +pass +# write_report(output_path) -- cgit v1.2.3