import csv from collections import defaultdict from datetime import datetime from decimal import Decimal from itertools import groupby import logging from typing import Dict, List from ledger_action import LedgerAction from trade import Trade from trade_queue import FIFOQueue # Set up a dedicated logger for FIFOQueue logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) def generate_report(sale_entries, proceeds: float | Decimal, crypto_asset, date_sold): report = [] sell_date = datetime.strptime(date_sold, "%Y-%m-%d").strftime("%d.%m.%Y") proceeds = Decimal(proceeds) trade: Trade for trade in sale_entries: buy_date_formatted = datetime.strptime(trade.date, "%Y-%m-%d").strftime( "%d.%m.%Y" ) holding_period = ( datetime.strptime(date_sold, "%Y-%m-%d") - datetime.strptime(trade.date, "%Y-%m-%d") ).days short_or_long = "Short" if holding_period < 365 else "Long" total_cost = trade.total_cost gain_or_loss = proceeds - total_cost report.append( { "Amount": f"{trade.amount:.8f}", "Currency": crypto_asset, "Date Sold": sell_date, "Date Acquired": buy_date_formatted, "Short/Long": short_or_long, "Buy/Input at": "Kraken", "Sell/Output at": "Kraken", "Proceeds": f"{proceeds:.2f}", "Cost Basis": f"{total_cost:.2f}", "Gain/Loss": f"{gain_or_loss:.2f}", } ) return report def parse_kraken_row(row: dict) -> LedgerAction: date = row["time"].split(" ")[0] return LedgerAction( type=row["type"], asset=row["asset"], amount=Decimal(row["amount"]), fee=Decimal(row.get("fee", "0")), refid=row.get("refid", ""), date=date, ) def read_kraken_ledger(csv_path: str) -> List[LedgerAction]: with open(csv_path, "r") as file: reader = csv.DictReader(file) return list(map(parse_kraken_row, reader)) fifo_queues: Dict[str, FIFOQueue] = {} # Separate FIFO queue per cryptocurrency report = [] def process_trade(refid: str, trades: List[LedgerAction]): if len(trades) == 2: # Ensure we have two related rows (EUR + crypto) eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None) crypto_trade = next((trade for trade in trades if trade.asset != "EUR"), None) if eur_trade and crypto_trade: crypto_asset = crypto_trade.asset eur_amount = Decimal(eur_trade.amount) eur_fee = Decimal(eur_trade.fee) crypto_amount = Decimal(crypto_trade.amount) crypto_fee = Decimal(crypto_trade.fee) fifo_queues.setdefault(crypto_asset, FIFOQueue()) date_sold = eur_trade.date if eur_amount < 0: # Purchase of cryptocurrency stake_amount = -eur_amount - eur_fee # Account for EUR fees crypto_amount -= crypto_fee # Adjust for crypto fees fifo_queues[crypto_asset].add(crypto_amount, stake_amount, date_sold) elif eur_amount > 0: # Sale of cryptocurrency proceeds = eur_amount - eur_fee # Account for EUR fees sale_entries = fifo_queues[crypto_asset].remove_coins(-crypto_amount) report.extend( generate_report(sale_entries, proceeds, crypto_asset, date_sold) ) else: raise ValueError(f"Unexpected trade grouping for refid {refid}") else: raise ValueError(f"Unexpected number of trades for refid {refid}") def process_ledger(read_actions: List[LedgerAction], output_path: str): for refid, actions in groupby( sorted(read_actions, key=lambda a: a.refid), lambda a: a.refid ): actions = list(actions) if len(actions) == 0: logger.error("actions is empty") continue action = actions[0] # Group trades by refid if action.type == "trade": process_trade(refid, actions) elif action.type == "deposit" and action.asset != "EUR": assert len(actions) == 1 currency = action.asset fifo_queues.setdefault(currency, FIFOQueue()) amount = Decimal(action.amount) price = 0 current = fifo_queues[currency] # remove transaction fees # (but only if it has a previous withdraw, or rather, if these are coins from a previous purchase... # but I can't check that...) if len(current): current.remove_coins(action.fee) current.add(amount, price, action.date) elif action.type == "withdrawal" and action.asset != "EUR": assert len(actions) == 1 currency = action.asset fifo_queues.setdefault(currency, FIFOQueue()) amount = Decimal(action.amount) price = 0 # Deposits typically have no associated cost basis fifo_queues[currency].add(amount, price, action.date) # Write report to CSV with open(output_path, "w", newline="") as csvfile: fieldnames = [ "Amount", "Currency", "Date Sold", "Date Acquired", "Short/Long", "Buy/Input at", "Sell/Output at", "Proceeds", "Cost Basis", "Gain/Loss", ] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() writer.writerows(report) logging.basicConfig(level=logging.DEBUG) # Usage ledger_path = "kraken_ledger.csv" # Replace with your ledger file path output_path = "tax_report.csv" # Replace with your desired output file path actions = read_kraken_ledger(ledger_path) process_ledger(actions, output_path)