1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
import csv
from collections import defaultdict
from datetime import datetime
from trade_queue import FIFOQueue
def generate_report(sale_entries, proceeds, crypto_asset, date_sold):
report = []
sell_date = datetime.strptime(date_sold, "%Y-%m-%d").strftime("%d.%m.%Y")
for amount, cost, buy_date in sale_entries:
buy_date_formatted = datetime.strptime(buy_date, "%Y-%m-%d").strftime("%d.%m.%Y")
holding_period = (datetime.strptime(date_sold, "%Y-%m-%d") - datetime.strptime(buy_date, "%Y-%m-%d")).days
short_or_long = "Short" if holding_period < 365 else "Long"
cost_basis = cost
gain_or_loss = proceeds - cost_basis
report.append({
"Amount": f"{amount:.8f}",
"Currency": crypto_asset,
"Date Sold": sell_date,
"Date Acquired": buy_date_formatted,
"Short/Long": short_or_long,
"Buy/Input at": "Kraken",
"Sell/Output at": "Kraken",
"Proceeds": f"{proceeds:.2f}",
"Cost Basis": f"{cost_basis:.2f}",
"Gain/Loss": f"{gain_or_loss:.2f}",
})
return report
def process_ledger(file_path, output_path):
fifo_queues = {} # Separate FIFO queue per cryptocurrency
trades_by_refid = defaultdict(list)
report = []
with open(file_path, 'r') as file:
reader = csv.DictReader(file)
for row in reader:
# Group trades by refid
if row["type"] == "trade":
trades_by_refid[row["refid"]].append(row)
# Handle deposits
elif row["type"] == "deposit":
currency = row["asset"]
fifo_queues.setdefault(currency, FIFOQueue())
amount = float(row["amount"])
price = 0 # Deposits typically have no associated cost basis
date = row["time"].split(" ")[0]
fifo_queues[currency].add(amount, price, date)
# Process grouped trades
for refid, trades in trades_by_refid.items():
if len(trades) == 2: # Ensure we have two related rows (EUR + crypto)
eur_trade = next((trade for trade in trades if trade["asset"] == "EUR"), None)
crypto_trade = next((trade for trade in trades if trade["asset"] != "EUR"), None)
if eur_trade and crypto_trade:
crypto_asset = crypto_trade["asset"]
eur_amount = float(eur_trade["amount"])
eur_fee = float(eur_trade["fee"])
crypto_amount = float(crypto_trade["amount"])
crypto_fee = float(crypto_trade["fee"])
fifo_queues.setdefault(crypto_asset, FIFOQueue())
date_sold = eur_trade["time"].split(" ")[0]
if eur_amount < 0: # Purchase of cryptocurrency
stake_amount = -eur_amount - eur_fee # Account for EUR fees
crypto_amount -= crypto_fee # Adjust for crypto fees
fifo_queues[crypto_asset].add(crypto_amount, stake_amount, date_sold)
elif eur_amount > 0: # Sale of cryptocurrency
proceeds = eur_amount - eur_fee # Account for EUR fees
sale_entries = fifo_queues[crypto_asset].remove(-crypto_amount)
report.extend(generate_report(sale_entries, proceeds, crypto_asset, date_sold))
else:
raise ValueError(f"Unexpected trade grouping for refid {refid}")
else:
raise ValueError(f"Unexpected number of trades for refid {refid}")
# Write report to CSV
with open(output_path, 'w', newline='') as csvfile:
fieldnames = [
"Amount",
"Currency",
"Date Sold",
"Date Acquired",
"Short/Long",
"Buy/Input at",
"Sell/Output at",
"Proceeds",
"Cost Basis",
"Gain/Loss"
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(report)
# Usage
ledger_path = "kraken_ledger.csv" # Replace with your ledger file path
output_path = "tax_report.csv" # Replace with your desired output file path
process_ledger(ledger_path, output_path)
|