summaryrefslogtreecommitdiff
path: root/bla.py
blob: 4fc34757c9752f986e301f453de59aa5d8e8e148 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import csv
from collections import defaultdict
from datetime import datetime
from decimal import Decimal
import logging
from typing import Dict, List

from ledger_action import LedgerAction
from trade import Trade
from trade_queue import FIFOQueue


def generate_report(sale_entries, proceeds: float|Decimal, crypto_asset, date_sold):
    report = []
    sell_date = datetime.strptime(date_sold, "%Y-%m-%d").strftime("%d.%m.%Y")
    proceeds = Decimal(proceeds)

    trade: Trade
    for trade in sale_entries:
        buy_date_formatted = datetime.strptime(trade.date, "%Y-%m-%d").strftime("%d.%m.%Y")
        holding_period = (datetime.strptime(date_sold, "%Y-%m-%d") - datetime.strptime(trade.date, "%Y-%m-%d")).days
        short_or_long = "Short" if holding_period < 365 else "Long"
        total_cost = trade.total_cost
        gain_or_loss = proceeds - total_cost

        report.append({
            "Amount": f"{trade.amount:.8f}",
            "Currency": crypto_asset,
            "Date Sold": sell_date,
            "Date Acquired": buy_date_formatted,
            "Short/Long": short_or_long,
            "Buy/Input at": "Kraken",
            "Sell/Output at": "Kraken",
            "Proceeds": f"{proceeds:.2f}",
            "Cost Basis": f"{total_cost:.2f}",
            "Gain/Loss": f"{gain_or_loss:.2f}",
        })

    return report

def read_kraken_ledger(csv_path: str) -> List[LedgerAction]:
    actions :List[LedgerAction] = []

    with open(csv_path, 'r') as file:
        reader = csv.DictReader(file)
        for row in reader:
            date = row["time"].split(" ")[0]
            actions.append(LedgerAction(
                type=row["type"],
                asset=row["asset"],
                amount=Decimal(row["amount"]),
                fee=Decimal(row.get("fee", "0")),
                refid=row.get("refid", ""),
                date=date
            ))

    return actions

def process_ledger(file_path :str, output_path :str):
    fifo_queues :Dict[str,FIFOQueue] = {}  # Separate FIFO queue per cryptocurrency
    trades_by_refid :Dict[str, list[LedgerAction]] = defaultdict(list)
    report = []
    actions = read_kraken_ledger(file_path)

    for action in actions:
        # Group trades by refid
        if action.type == "trade":
            trades_by_refid[action.refid].append(action)

        # Handle deposits
        elif action.type == "deposit" and action.asset != "EUR":
            currency = action.asset
            fifo_queues.setdefault(currency, FIFOQueue())
            amount = Decimal(action.amount)
            price = 0  # Deposits typically have no associated cost basis
            fifo_queues[currency].add(amount, price, action.date)

    # Process grouped trades
    for refid, trades in trades_by_refid.items():
        if len(trades) == 2:  # Ensure we have two related rows (EUR + crypto)
            eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None)
            crypto_trade = next((trade for trade in trades if trade.asset != "EUR"), None)

            if eur_trade and crypto_trade:
                crypto_asset = crypto_trade.asset
                eur_amount = Decimal(eur_trade.amount)
                eur_fee = Decimal(eur_trade.fee)
                crypto_amount = Decimal(crypto_trade.amount)
                crypto_fee = Decimal(crypto_trade.fee)
                fifo_queues.setdefault(crypto_asset, FIFOQueue())

                date_sold = eur_trade.date

                if eur_amount < 0:  # Purchase of cryptocurrency
                    stake_amount = -eur_amount - eur_fee  # Account for EUR fees
                    crypto_amount -= crypto_fee  # Adjust for crypto fees
                    fifo_queues[crypto_asset].add(crypto_amount, stake_amount, date_sold)
                elif eur_amount > 0:  # Sale of cryptocurrency
                    proceeds = eur_amount - eur_fee  # Account for EUR fees
                    sale_entries = fifo_queues[crypto_asset].remove_coins(-crypto_amount)
                    report.extend(generate_report(sale_entries, proceeds, crypto_asset, date_sold))
            else:
                raise ValueError(f"Unexpected trade grouping for refid {refid}")
        else:
            raise ValueError(f"Unexpected number of trades for refid {refid}")

    # Write report to CSV
    with open(output_path, 'w', newline='') as csvfile:
        fieldnames = [
            "Amount",
            "Currency",
            "Date Sold",
            "Date Acquired",
            "Short/Long",
            "Buy/Input at",
            "Sell/Output at",
            "Proceeds",
            "Cost Basis",
            "Gain/Loss"
        ]
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(report)

logging.basicConfig(level=logging.DEBUG)

# Usage
ledger_path = "kraken_ledger.csv"  # Replace with your ledger file path
output_path = "tax_report.csv"  # Replace with your desired output file path
process_ledger(ledger_path, output_path)