1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
|
import csv
from datetime import datetime
from decimal import Decimal
from itertools import groupby
import logging
from typing import Dict, List
from kraken import read_ledger
from ledger_action import LedgerAction
from trade import Trade
from trade_queue import FIFOQueue
logging.basicConfig(
level=logging.WARNING, format="%(asctime)s - %(levelname)s - %(name)s - %(message)s"
)
# Set up a dedicated logger for FIFOQueue
logger = logging.getLogger("pnlcalc")
def generate_report(sale_entries, proceeds: Decimal, crypto_asset, date_sold):
report = []
sell_date = datetime.strptime(date_sold, "%Y-%m-%d").strftime("%d.%m.%Y")
proceeds = Decimal(proceeds)
trade: Trade
for trade in sale_entries:
buy_date_formatted = datetime.strptime(trade.date, "%Y-%m-%d").strftime(
"%d.%m.%Y"
)
holding_period = (
datetime.strptime(date_sold, "%Y-%m-%d")
- datetime.strptime(trade.date, "%Y-%m-%d")
).days
short_or_long = "Short" if holding_period < 365 else "Long"
total_cost = trade.total_cost
gain_or_loss = proceeds - total_cost
report.append(
{
"Amount": f"{trade.amount:.8f}",
"Currency": crypto_asset,
"Date Sold": sell_date,
"Date Acquired": buy_date_formatted,
"Short/Long": short_or_long,
"Buy/Input at": "Kraken",
"Sell/Output at": "Kraken",
"Proceeds": f"{proceeds:.2f}",
"Cost Basis": f"{total_cost:.2f}",
"Gain/Loss": f"{gain_or_loss:.2f}",
}
)
return report
fifo_queues: Dict[str, FIFOQueue] = {} # Separate FIFO queue per cryptocurrency
report = []
def process_trade(refid: str, trades: List[LedgerAction]):
if len(trades) == 2: # Ensure we have two related rows (EUR + crypto)
eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None)
crypto_trade = next((trade for trade in trades if trade.asset != "EUR"), None)
if eur_trade and crypto_trade:
crypto_asset = crypto_trade.asset
eur_amount = Decimal(eur_trade.amount)
eur_fee = Decimal(eur_trade.fee)
crypto_amount = Decimal(crypto_trade.amount)
crypto_fee = Decimal(crypto_trade.fee)
fifo_queues.setdefault(crypto_asset, FIFOQueue())
date_sold = eur_trade.date
if eur_amount < 0: # Purchase of cryptocurrency
stake_amount = -eur_amount - eur_fee # Account for EUR fees
crypto_amount -= crypto_fee # Adjust for crypto fees
fifo_queues[crypto_asset].add_trade(Trade(crypto_amount, stake_amount, date_sold, refid=refid))
elif eur_amount > 0: # Sale of cryptocurrency
proceeds = eur_amount - eur_fee
fifo_queues[crypto_asset].add_trade(Trade(crypto_amount, -proceeds, date_sold, refid=refid))
# sale_entries = fifo_queues[crypto_asset].remove_coins(-crypto_amount)
# report.extend(
# generate_report(sale_entries, proceeds, crypto_asset, date_sold)
# )
else:
raise ValueError(f"Unexpected trade grouping for refid {refid}")
else:
raise ValueError(f"Unexpected number of trades for refid {refid}")
def process_ledger(read_actions: List[LedgerAction], output_path: str):
# don't make any assumtptions about ledger sorting
# groupby requires sorted inputs
actions_sorted = sorted(read_actions, key=lambda a: (a.refid, a.timestamp))
_grouped_actions = groupby(actions_sorted, lambda a: a.refid)
_list_tuple_actions = [(k, list(v)) for k, v in _grouped_actions]
# finally, sort groupy by first available timestamp
sorted_grouped_actions = sorted(_list_tuple_actions, key=lambda a: a[1][0].timestamp)
for refid, actions in sorted_grouped_actions:
actions = list(actions)
if len(actions) == 0:
logger.error("actions is empty")
continue
action = actions[0]
# Group trades by refid
if action.type == "trade":
process_trade(refid, actions)
elif action.type == "deposit" and action.asset != "EUR":
assert len(actions) == 1
logger.error("Don't know how do handle deposits yet.")
# currency = action.asset
# fifo_queues.setdefault(currency, FIFOQueue())
# amount = Decimal(action.amount)
# price = 0
# current = fifo_queues[currency]
# # remove transaction fees
# # (but only if it has a previous withdraw, or rather, if these are coins from a previous purchase...
# # but I can't check that...)
# if len(current):
# current.remove_coins(action.fee)
# current.add(amount, price, action.date)
elif action.type == "withdrawal" and action.asset != "EUR":
assert len(actions) == 1
logger.error("Don't know how do handle withdrawals yet.")
# currency = action.asset
# fifo_queues.setdefault(currency, FIFOQueue())
# amount = Decimal(action.amount)
# price = 0 # Deposits typically have no associated cost basis
# current = fifo_queues[currency]
# current.add(amount, price, action.date)
# Write report to CSV
with open(output_path, "w", newline="") as csvfile:
fieldnames = [
"Amount",
"Currency",
"Date Sold",
"Date Acquired",
"Short/Long",
"Buy/Input at",
"Sell/Output at",
"Proceeds",
"Cost Basis",
"Gain/Loss",
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(report)
# Usage
ledger_path = "ledgers.csv" # Replace with your ledger file path
output_path = "tax_report.csv" # Replace with your desired output file path
actions = read_ledger(ledger_path)
process_ledger(actions, output_path)
|