From 38393b60a6acd9f3a39bbf28450fa58abae1f629 Mon Sep 17 00:00:00 2001 From: uvok Date: Tue, 15 Apr 2025 17:58:45 +0200 Subject: Use groupby for better code --- bla.py | 174 +++++++++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 109 insertions(+), 65 deletions(-) (limited to 'bla.py') diff --git a/bla.py b/bla.py index 4fc3475..979271f 100644 --- a/bla.py +++ b/bla.py @@ -2,6 +2,7 @@ import csv from collections import defaultdict from datetime import datetime from decimal import Decimal +from itertools import groupby import logging from typing import Dict, List @@ -9,103 +10,144 @@ from ledger_action import LedgerAction from trade import Trade from trade_queue import FIFOQueue +# Set up a dedicated logger for FIFOQueue +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) -def generate_report(sale_entries, proceeds: float|Decimal, crypto_asset, date_sold): + +def generate_report(sale_entries, proceeds: float | Decimal, crypto_asset, date_sold): report = [] sell_date = datetime.strptime(date_sold, "%Y-%m-%d").strftime("%d.%m.%Y") proceeds = Decimal(proceeds) trade: Trade for trade in sale_entries: - buy_date_formatted = datetime.strptime(trade.date, "%Y-%m-%d").strftime("%d.%m.%Y") - holding_period = (datetime.strptime(date_sold, "%Y-%m-%d") - datetime.strptime(trade.date, "%Y-%m-%d")).days + buy_date_formatted = datetime.strptime(trade.date, "%Y-%m-%d").strftime( + "%d.%m.%Y" + ) + holding_period = ( + datetime.strptime(date_sold, "%Y-%m-%d") + - datetime.strptime(trade.date, "%Y-%m-%d") + ).days short_or_long = "Short" if holding_period < 365 else "Long" total_cost = trade.total_cost gain_or_loss = proceeds - total_cost - report.append({ - "Amount": f"{trade.amount:.8f}", - "Currency": crypto_asset, - "Date Sold": sell_date, - "Date Acquired": buy_date_formatted, - "Short/Long": short_or_long, - "Buy/Input at": "Kraken", - "Sell/Output at": "Kraken", - "Proceeds": f"{proceeds:.2f}", - "Cost Basis": f"{total_cost:.2f}", - "Gain/Loss": f"{gain_or_loss:.2f}", - }) + report.append( + { + "Amount": f"{trade.amount:.8f}", + "Currency": crypto_asset, + "Date Sold": sell_date, + "Date Acquired": buy_date_formatted, + "Short/Long": short_or_long, + "Buy/Input at": "Kraken", + "Sell/Output at": "Kraken", + "Proceeds": f"{proceeds:.2f}", + "Cost Basis": f"{total_cost:.2f}", + "Gain/Loss": f"{gain_or_loss:.2f}", + } + ) return report + def read_kraken_ledger(csv_path: str) -> List[LedgerAction]: - actions :List[LedgerAction] = [] + actions: List[LedgerAction] = [] - with open(csv_path, 'r') as file: + with open(csv_path, "r") as file: reader = csv.DictReader(file) for row in reader: date = row["time"].split(" ")[0] - actions.append(LedgerAction( - type=row["type"], - asset=row["asset"], - amount=Decimal(row["amount"]), - fee=Decimal(row.get("fee", "0")), - refid=row.get("refid", ""), - date=date - )) + actions.append( + LedgerAction( + type=row["type"], + asset=row["asset"], + amount=Decimal(row["amount"]), + fee=Decimal(row.get("fee", "0")), + refid=row.get("refid", ""), + date=date, + ) + ) return actions -def process_ledger(file_path :str, output_path :str): - fifo_queues :Dict[str,FIFOQueue] = {} # Separate FIFO queue per cryptocurrency - trades_by_refid :Dict[str, list[LedgerAction]] = defaultdict(list) - report = [] - actions = read_kraken_ledger(file_path) - for action in actions: +fifo_queues: Dict[str, FIFOQueue] = {} # Separate FIFO queue per cryptocurrency +report = [] + + +def process_trade(refid: str, trades: List[LedgerAction]): + if len(trades) == 2: # Ensure we have two related rows (EUR + crypto) + eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None) + crypto_trade = next((trade for trade in trades if trade.asset != "EUR"), None) + + if eur_trade and crypto_trade: + crypto_asset = crypto_trade.asset + eur_amount = Decimal(eur_trade.amount) + eur_fee = Decimal(eur_trade.fee) + crypto_amount = Decimal(crypto_trade.amount) + crypto_fee = Decimal(crypto_trade.fee) + fifo_queues.setdefault(crypto_asset, FIFOQueue()) + + date_sold = eur_trade.date + + if eur_amount < 0: # Purchase of cryptocurrency + stake_amount = -eur_amount - eur_fee # Account for EUR fees + crypto_amount -= crypto_fee # Adjust for crypto fees + fifo_queues[crypto_asset].add(crypto_amount, stake_amount, date_sold) + elif eur_amount > 0: # Sale of cryptocurrency + proceeds = eur_amount - eur_fee # Account for EUR fees + sale_entries = fifo_queues[crypto_asset].remove_coins(-crypto_amount) + report.extend( + generate_report(sale_entries, proceeds, crypto_asset, date_sold) + ) + else: + raise ValueError(f"Unexpected trade grouping for refid {refid}") + else: + raise ValueError(f"Unexpected number of trades for refid {refid}") + + +def process_ledger(read_actions: List[LedgerAction], output_path: str): + for refid, actions in groupby( + sorted(read_actions, key=lambda a: a.refid), lambda a: a.refid + ): + actions = list(actions) + + if len(actions) == 0: + logger.error("actions is empty") + continue + action = actions[0] + # Group trades by refid if action.type == "trade": - trades_by_refid[action.refid].append(action) + process_trade(refid, actions) - # Handle deposits elif action.type == "deposit" and action.asset != "EUR": + assert len(actions) == 1 + currency = action.asset + fifo_queues.setdefault(currency, FIFOQueue()) + amount = Decimal(action.amount) + price = 0 + current = fifo_queues[currency] + + # remove transaction fees + # (but only if it has a previous withdraw, or rather, if these are coins from a previous purchase... + # but I can't check that...) + if len(current): + current.remove_coins(action.fee) + + current.add(amount, price, action.date) + + elif action.type == "withdrawal" and action.asset != "EUR": + assert len(actions) == 1 currency = action.asset fifo_queues.setdefault(currency, FIFOQueue()) amount = Decimal(action.amount) price = 0 # Deposits typically have no associated cost basis fifo_queues[currency].add(amount, price, action.date) - # Process grouped trades - for refid, trades in trades_by_refid.items(): - if len(trades) == 2: # Ensure we have two related rows (EUR + crypto) - eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None) - crypto_trade = next((trade for trade in trades if trade.asset != "EUR"), None) - - if eur_trade and crypto_trade: - crypto_asset = crypto_trade.asset - eur_amount = Decimal(eur_trade.amount) - eur_fee = Decimal(eur_trade.fee) - crypto_amount = Decimal(crypto_trade.amount) - crypto_fee = Decimal(crypto_trade.fee) - fifo_queues.setdefault(crypto_asset, FIFOQueue()) - - date_sold = eur_trade.date - - if eur_amount < 0: # Purchase of cryptocurrency - stake_amount = -eur_amount - eur_fee # Account for EUR fees - crypto_amount -= crypto_fee # Adjust for crypto fees - fifo_queues[crypto_asset].add(crypto_amount, stake_amount, date_sold) - elif eur_amount > 0: # Sale of cryptocurrency - proceeds = eur_amount - eur_fee # Account for EUR fees - sale_entries = fifo_queues[crypto_asset].remove_coins(-crypto_amount) - report.extend(generate_report(sale_entries, proceeds, crypto_asset, date_sold)) - else: - raise ValueError(f"Unexpected trade grouping for refid {refid}") - else: - raise ValueError(f"Unexpected number of trades for refid {refid}") - # Write report to CSV - with open(output_path, 'w', newline='') as csvfile: + with open(output_path, "w", newline="") as csvfile: fieldnames = [ "Amount", "Currency", @@ -116,15 +158,17 @@ def process_ledger(file_path :str, output_path :str): "Sell/Output at", "Proceeds", "Cost Basis", - "Gain/Loss" + "Gain/Loss", ] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() writer.writerows(report) + logging.basicConfig(level=logging.DEBUG) # Usage ledger_path = "kraken_ledger.csv" # Replace with your ledger file path output_path = "tax_report.csv" # Replace with your desired output file path -process_ledger(ledger_path, output_path) +actions = read_kraken_ledger(ledger_path) +process_ledger(actions, output_path) -- cgit v1.2.3