diff options
author | uvok | 2025-04-18 11:35:41 +0200 |
---|---|---|
committer | uvok | 2025-04-18 11:35:41 +0200 |
commit | d80b01d7425a2386612d84ced9c23cca9d572c89 (patch) | |
tree | 600c98aaeb7b0065d6cf3fe6715d5ccb1f245176 /pnlcalc.py | |
parent | a25069002d9933aa40a3172673d01c3ce9d3e8d3 (diff) |
Rename
Diffstat (limited to 'pnlcalc.py')
-rw-r--r-- | pnlcalc.py | 165 |
1 files changed, 165 insertions, 0 deletions
diff --git a/pnlcalc.py b/pnlcalc.py new file mode 100644 index 0000000..309a0b0 --- /dev/null +++ b/pnlcalc.py @@ -0,0 +1,165 @@ +import csv +from datetime import datetime +from decimal import Decimal +from itertools import groupby +import logging +from typing import Dict, List + +from kraken import read_ledger +from ledger_action import LedgerAction +from trade import Trade +from trade_queue import FIFOQueue + +logging.basicConfig( + level=logging.WARNING, format="%(asctime)s - %(levelname)s - %(name)s - %(message)s" +) + +# Set up a dedicated logger for FIFOQueue +logger = logging.getLogger("pnlcalc") + + +def generate_report(sale_entries, proceeds: Decimal, crypto_asset, date_sold): + report = [] + sell_date = datetime.strptime(date_sold, "%Y-%m-%d").strftime("%d.%m.%Y") + proceeds = Decimal(proceeds) + + trade: Trade + for trade in sale_entries: + buy_date_formatted = datetime.strptime(trade.date, "%Y-%m-%d").strftime( + "%d.%m.%Y" + ) + holding_period = ( + datetime.strptime(date_sold, "%Y-%m-%d") + - datetime.strptime(trade.date, "%Y-%m-%d") + ).days + short_or_long = "Short" if holding_period < 365 else "Long" + total_cost = trade.total_cost + gain_or_loss = proceeds - total_cost + + report.append( + { + "Amount": f"{trade.amount:.8f}", + "Currency": crypto_asset, + "Date Sold": sell_date, + "Date Acquired": buy_date_formatted, + "Short/Long": short_or_long, + "Buy/Input at": "Kraken", + "Sell/Output at": "Kraken", + "Proceeds": f"{proceeds:.2f}", + "Cost Basis": f"{total_cost:.2f}", + "Gain/Loss": f"{gain_or_loss:.2f}", + } + ) + + return report + + +fifo_queues: Dict[str, FIFOQueue] = {} # Separate FIFO queue per cryptocurrency +report = [] + + +def process_trade(refid: str, trades: List[LedgerAction]): + if len(trades) == 2: # Ensure we have two related rows (EUR + crypto) + eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None) + crypto_trade = next((trade for trade in trades if trade.asset != "EUR"), None) + + if eur_trade and crypto_trade: + crypto_asset = crypto_trade.asset + eur_amount = Decimal(eur_trade.amount) + eur_fee = Decimal(eur_trade.fee) + crypto_amount = Decimal(crypto_trade.amount) + crypto_fee = Decimal(crypto_trade.fee) + fifo_queues.setdefault(crypto_asset, FIFOQueue()) + + date_sold = eur_trade.date + + if eur_amount < 0: # Purchase of cryptocurrency + stake_amount = -eur_amount - eur_fee # Account for EUR fees + crypto_amount -= crypto_fee # Adjust for crypto fees + fifo_queues[crypto_asset].add_trade(Trade(crypto_amount, stake_amount, date_sold, refid=refid)) + elif eur_amount > 0: # Sale of cryptocurrency + proceeds = eur_amount - eur_fee + fifo_queues[crypto_asset].add_trade(Trade(crypto_amount, -proceeds, date_sold, refid=refid)) + # sale_entries = fifo_queues[crypto_asset].remove_coins(-crypto_amount) + # report.extend( + # generate_report(sale_entries, proceeds, crypto_asset, date_sold) + # ) + else: + raise ValueError(f"Unexpected trade grouping for refid {refid}") + else: + raise ValueError(f"Unexpected number of trades for refid {refid}") + + +def process_ledger(read_actions: List[LedgerAction], output_path: str): + # don't make any assumtptions about ledger sorting + # groupby requires sorted inputs + actions_sorted = sorted(read_actions, key=lambda a: (a.refid, a.timestamp)) + _grouped_actions = groupby(actions_sorted, lambda a: a.refid) + _list_tuple_actions = [(k, list(v)) for k, v in _grouped_actions] + # finally, sort groupy by first available timestamp + sorted_grouped_actions = sorted(_list_tuple_actions, key=lambda a: a[1][0].timestamp) + + for refid, actions in sorted_grouped_actions: + actions = list(actions) + + if len(actions) == 0: + logger.error("actions is empty") + continue + action = actions[0] + + # Group trades by refid + if action.type == "trade": + process_trade(refid, actions) + + elif action.type == "deposit" and action.asset != "EUR": + assert len(actions) == 1 + logger.error("Don't know how do handle deposits yet.") + # currency = action.asset + # fifo_queues.setdefault(currency, FIFOQueue()) + # amount = Decimal(action.amount) + # price = 0 + # current = fifo_queues[currency] + + # # remove transaction fees + # # (but only if it has a previous withdraw, or rather, if these are coins from a previous purchase... + # # but I can't check that...) + # if len(current): + # current.remove_coins(action.fee) + + # current.add(amount, price, action.date) + + elif action.type == "withdrawal" and action.asset != "EUR": + assert len(actions) == 1 + logger.error("Don't know how do handle withdrawals yet.") + # currency = action.asset + # fifo_queues.setdefault(currency, FIFOQueue()) + # amount = Decimal(action.amount) + # price = 0 # Deposits typically have no associated cost basis + # current = fifo_queues[currency] + + # current.add(amount, price, action.date) + + # Write report to CSV + with open(output_path, "w", newline="") as csvfile: + fieldnames = [ + "Amount", + "Currency", + "Date Sold", + "Date Acquired", + "Short/Long", + "Buy/Input at", + "Sell/Output at", + "Proceeds", + "Cost Basis", + "Gain/Loss", + ] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(report) + + +# Usage +ledger_path = "ledgers.csv" # Replace with your ledger file path +output_path = "tax_report.csv" # Replace with your desired output file path +actions = read_ledger(ledger_path) +process_ledger(actions, output_path) |