summaryrefslogtreecommitdiff
path: root/pnlcalc.py
blob: 309a0b064895cdb85e8b92d29208e4600ba40541 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import csv
from datetime import datetime
from decimal import Decimal
from itertools import groupby
import logging
from typing import Dict, List

from kraken import read_ledger
from ledger_action import LedgerAction
from trade import Trade
from trade_queue import FIFOQueue

logging.basicConfig(
    level=logging.WARNING, format="%(asctime)s - %(levelname)s - %(name)s - %(message)s"
)

# Set up a dedicated logger for FIFOQueue
logger = logging.getLogger("pnlcalc")


def generate_report(sale_entries, proceeds: Decimal, crypto_asset, date_sold):
    report = []
    sell_date = datetime.strptime(date_sold, "%Y-%m-%d").strftime("%d.%m.%Y")
    proceeds = Decimal(proceeds)

    trade: Trade
    for trade in sale_entries:
        buy_date_formatted = datetime.strptime(trade.date, "%Y-%m-%d").strftime(
            "%d.%m.%Y"
        )
        holding_period = (
            datetime.strptime(date_sold, "%Y-%m-%d")
            - datetime.strptime(trade.date, "%Y-%m-%d")
        ).days
        short_or_long = "Short" if holding_period < 365 else "Long"
        total_cost = trade.total_cost
        gain_or_loss = proceeds - total_cost

        report.append(
            {
                "Amount": f"{trade.amount:.8f}",
                "Currency": crypto_asset,
                "Date Sold": sell_date,
                "Date Acquired": buy_date_formatted,
                "Short/Long": short_or_long,
                "Buy/Input at": "Kraken",
                "Sell/Output at": "Kraken",
                "Proceeds": f"{proceeds:.2f}",
                "Cost Basis": f"{total_cost:.2f}",
                "Gain/Loss": f"{gain_or_loss:.2f}",
            }
        )

    return report


fifo_queues: Dict[str, FIFOQueue] = {}  # Separate FIFO queue per cryptocurrency
report = []


def process_trade(refid: str, trades: List[LedgerAction]):
    if len(trades) == 2:  # Ensure we have two related rows (EUR + crypto)
        eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None)
        crypto_trade = next((trade for trade in trades if trade.asset != "EUR"), None)

        if eur_trade and crypto_trade:
            crypto_asset = crypto_trade.asset
            eur_amount = Decimal(eur_trade.amount)
            eur_fee = Decimal(eur_trade.fee)
            crypto_amount = Decimal(crypto_trade.amount)
            crypto_fee = Decimal(crypto_trade.fee)
            fifo_queues.setdefault(crypto_asset, FIFOQueue())

            date_sold = eur_trade.date

            if eur_amount < 0:  # Purchase of cryptocurrency
                stake_amount = -eur_amount - eur_fee  # Account for EUR fees
                crypto_amount -= crypto_fee  # Adjust for crypto fees
                fifo_queues[crypto_asset].add_trade(Trade(crypto_amount, stake_amount, date_sold, refid=refid))
            elif eur_amount > 0:  # Sale of cryptocurrency
                proceeds = eur_amount - eur_fee
                fifo_queues[crypto_asset].add_trade(Trade(crypto_amount, -proceeds, date_sold, refid=refid))
                # sale_entries = fifo_queues[crypto_asset].remove_coins(-crypto_amount)
                # report.extend(
                #     generate_report(sale_entries, proceeds, crypto_asset, date_sold)
                # )
        else:
            raise ValueError(f"Unexpected trade grouping for refid {refid}")
    else:
        raise ValueError(f"Unexpected number of trades for refid {refid}")


def process_ledger(read_actions: List[LedgerAction], output_path: str):
    # don't make any assumtptions about ledger sorting
    # groupby requires sorted inputs
    actions_sorted = sorted(read_actions, key=lambda a: (a.refid, a.timestamp))
    _grouped_actions = groupby(actions_sorted, lambda a: a.refid)
    _list_tuple_actions = [(k, list(v)) for k, v in _grouped_actions]
    # finally, sort groupy by first available timestamp
    sorted_grouped_actions = sorted(_list_tuple_actions, key=lambda a: a[1][0].timestamp)

    for refid, actions in sorted_grouped_actions:
        actions = list(actions)

        if len(actions) == 0:
            logger.error("actions is empty")
            continue
        action = actions[0]

        # Group trades by refid
        if action.type == "trade":
            process_trade(refid, actions)

        elif action.type == "deposit" and action.asset != "EUR":
            assert len(actions) == 1
            logger.error("Don't know how do handle deposits yet.")
            # currency = action.asset
            # fifo_queues.setdefault(currency, FIFOQueue())
            # amount = Decimal(action.amount)
            # price = 0
            # current = fifo_queues[currency]

            # # remove transaction fees
            # # (but only if it has a previous withdraw, or rather, if these are coins from a previous purchase...
            # # but I can't check that...)
            # if len(current):
            #     current.remove_coins(action.fee)

            # current.add(amount, price, action.date)

        elif action.type == "withdrawal" and action.asset != "EUR":
            assert len(actions) == 1
            logger.error("Don't know how do handle withdrawals yet.")
        #     currency = action.asset
        #     fifo_queues.setdefault(currency, FIFOQueue())
        #     amount = Decimal(action.amount)
        #     price = 0  # Deposits typically have no associated cost basis
        #     current = fifo_queues[currency]

        #     current.add(amount, price, action.date)

    # Write report to CSV
    with open(output_path, "w", newline="") as csvfile:
        fieldnames = [
            "Amount",
            "Currency",
            "Date Sold",
            "Date Acquired",
            "Short/Long",
            "Buy/Input at",
            "Sell/Output at",
            "Proceeds",
            "Cost Basis",
            "Gain/Loss",
        ]
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(report)


# Usage
ledger_path = "ledgers.csv"  # Replace with your ledger file path
output_path = "tax_report.csv"  # Replace with your desired output file path
actions = read_ledger(ledger_path)
process_ledger(actions, output_path)