import csv from collections import defaultdict, deque from datetime import datetime class Trade: def __init__(self, amount, total_cost, date): self.amount = amount self.total_cost = total_cost self.date = date def __repr__(self): return f"Trade(amount={self.amount}, total_cost={self.total_cost}, date={self.date})" class FIFOQueue: """ Crypto trading FIFO queue. Will track trades. """ def __init__(self): self.queue = deque() def add(self, amount, total_cost, date): """ Add a trade to the queue. """ trade = Trade(amount, total_cost, date) self.queue.append(trade) def remove(self, amount): """ Remove a specified amount from the queue, returning the trades used to buy. """ if amount <= 0: raise ValueError("The amount to remove must be positive.") remaining = amount entries = [] while remaining > 0: if not self.queue: raise ValueError(f"Insufficient assets in queue to process sale of {amount}.") trade = self.queue[0] if trade.amount > remaining: trade.amount -= remaining entries.append(Trade(remaining, trade.total_cost, trade.date)) remaining = 0 else: remaining -= trade.amount entries.append(trade) self.queue.popleft() return entries def generate_report(sale_entries, proceeds, crypto_asset, date_sold): report = [] sell_date = datetime.strptime(date_sold, "%Y-%m-%d").strftime("%d.%m.%Y") for amount, cost, buy_date in sale_entries: buy_date_formatted = datetime.strptime(buy_date, "%Y-%m-%d").strftime("%d.%m.%Y") holding_period = (datetime.strptime(date_sold, "%Y-%m-%d") - datetime.strptime(buy_date, "%Y-%m-%d")).days short_or_long = "Short" if holding_period < 365 else "Long" cost_basis = cost gain_or_loss = proceeds - cost_basis report.append({ "Amount": f"{amount:.8f}", "Currency": crypto_asset, "Date Sold": sell_date, "Date Acquired": buy_date_formatted, "Short/Long": short_or_long, "Buy/Input at": "Kraken", "Sell/Output at": "Kraken", "Proceeds": f"{proceeds:.2f}", "Cost Basis": f"{cost_basis:.2f}", "Gain/Loss": f"{gain_or_loss:.2f}", }) return report def process_ledger(file_path, output_path): fifo_queues = {} # Separate FIFO queue per cryptocurrency trades_by_refid = defaultdict(list) report = [] with open(file_path, 'r') as file: reader = csv.DictReader(file) for row in reader: # Group trades by refid if row["type"] == "trade": trades_by_refid[row["refid"]].append(row) # Handle deposits elif row["type"] == "deposit": currency = row["asset"] fifo_queues.setdefault(currency, FIFOQueue()) amount = float(row["amount"]) price = 0 # Deposits typically have no associated cost basis date = row["time"].split(" ")[0] fifo_queues[currency].add(amount, price, date) # Process grouped trades for refid, trades in trades_by_refid.items(): if len(trades) == 2: # Ensure we have two related rows (EUR + crypto) eur_trade = next((trade for trade in trades if trade["asset"] == "EUR"), None) crypto_trade = next((trade for trade in trades if trade["asset"] != "EUR"), None) if eur_trade and crypto_trade: crypto_asset = crypto_trade["asset"] eur_amount = float(eur_trade["amount"]) eur_fee = float(eur_trade["fee"]) crypto_amount = float(crypto_trade["amount"]) crypto_fee = float(crypto_trade["fee"]) fifo_queues.setdefault(crypto_asset, FIFOQueue()) date_sold = eur_trade["time"].split(" ")[0] if eur_amount < 0: # Purchase of cryptocurrency stake_amount = -eur_amount - eur_fee # Account for EUR fees crypto_amount -= crypto_fee # Adjust for crypto fees fifo_queues[crypto_asset].add(crypto_amount, stake_amount, date_sold) elif eur_amount > 0: # Sale of cryptocurrency proceeds = eur_amount - eur_fee # Account for EUR fees sale_entries = fifo_queues[crypto_asset].remove(-crypto_amount) report.extend(generate_report(sale_entries, proceeds, crypto_asset, date_sold)) else: raise ValueError(f"Unexpected trade grouping for refid {refid}") else: raise ValueError(f"Unexpected number of trades for refid {refid}") # Write report to CSV with open(output_path, 'w', newline='') as csvfile: fieldnames = [ "Amount", "Currency", "Date Sold", "Date Acquired", "Short/Long", "Buy/Input at", "Sell/Output at", "Proceeds", "Cost Basis", "Gain/Loss" ] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() writer.writerows(report) # Usage ledger_path = "kraken_ledger.csv" # Replace with your ledger file path output_path = "tax_report.csv" # Replace with your desired output file path process_ledger(ledger_path, output_path)