import csv from collections import defaultdict from datetime import datetime from decimal import Decimal from typing import Dict, List from trade import Trade from trade_queue import FIFOQueue def generate_report(sale_entries, proceeds: float|Decimal, crypto_asset, date_sold): report = [] sell_date = datetime.strptime(date_sold, "%Y-%m-%d").strftime("%d.%m.%Y") proceeds = Decimal(proceeds) trade: Trade for trade in sale_entries: buy_date_formatted = datetime.strptime(trade.date, "%Y-%m-%d").strftime("%d.%m.%Y") holding_period = (datetime.strptime(date_sold, "%Y-%m-%d") - datetime.strptime(trade.date, "%Y-%m-%d")).days short_or_long = "Short" if holding_period < 365 else "Long" total_cost = trade.total_cost gain_or_loss = proceeds - total_cost report.append({ "Amount": f"{trade.amount:.8f}", "Currency": crypto_asset, "Date Sold": sell_date, "Date Acquired": buy_date_formatted, "Short/Long": short_or_long, "Buy/Input at": "Kraken", "Sell/Output at": "Kraken", "Proceeds": f"{proceeds:.2f}", "Cost Basis": f"{total_cost:.2f}", "Gain/Loss": f"{gain_or_loss:.2f}", }) return report def process_ledger(file_path, output_path): fifo_queues :Dict[str,FIFOQueue] = {} # Separate FIFO queue per cryptocurrency trades_by_refid = defaultdict(list) report = [] with open(file_path, 'r') as file: reader = csv.DictReader(file) for row in reader: # Group trades by refid if row["type"] == "trade": trades_by_refid[row["refid"]].append(row) # Handle deposits elif row["type"] == "deposit" and row["asset"] != "EUR": currency = row["asset"] fifo_queues.setdefault(currency, FIFOQueue()) amount = float(row["amount"]) price = 0 # Deposits typically have no associated cost basis date = row["time"].split(" ")[0] fifo_queues[currency].add(amount, price, date) # Process grouped trades for refid, trades in trades_by_refid.items(): if len(trades) == 2: # Ensure we have two related rows (EUR + crypto) eur_trade = next((trade for trade in trades if trade["asset"] == "EUR"), None) crypto_trade = next((trade for trade in trades if trade["asset"] != "EUR"), None) if eur_trade and crypto_trade: crypto_asset = crypto_trade["asset"] eur_amount = float(eur_trade["amount"]) eur_fee = float(eur_trade["fee"]) crypto_amount = float(crypto_trade["amount"]) crypto_fee = float(crypto_trade["fee"]) fifo_queues.setdefault(crypto_asset, FIFOQueue()) date_sold = eur_trade["time"].split(" ")[0] if eur_amount < 0: # Purchase of cryptocurrency stake_amount = -eur_amount - eur_fee # Account for EUR fees crypto_amount -= crypto_fee # Adjust for crypto fees fifo_queues[crypto_asset].add(crypto_amount, stake_amount, date_sold) elif eur_amount > 0: # Sale of cryptocurrency proceeds = eur_amount - eur_fee # Account for EUR fees sale_entries = fifo_queues[crypto_asset].remove_coins(-crypto_amount) report.extend(generate_report(sale_entries, proceeds, crypto_asset, date_sold)) else: raise ValueError(f"Unexpected trade grouping for refid {refid}") else: raise ValueError(f"Unexpected number of trades for refid {refid}") # Write report to CSV with open(output_path, 'w', newline='') as csvfile: fieldnames = [ "Amount", "Currency", "Date Sold", "Date Acquired", "Short/Long", "Buy/Input at", "Sell/Output at", "Proceeds", "Cost Basis", "Gain/Loss" ] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() writer.writerows(report) # Usage ledger_path = "kraken_ledger.csv" # Replace with your ledger file path output_path = "tax_report.csv" # Replace with your desired output file path process_ledger(ledger_path, output_path)