diff options
author | uvok | 2025-04-18 11:35:41 +0200 |
---|---|---|
committer | uvok | 2025-04-18 11:35:41 +0200 |
commit | d80b01d7425a2386612d84ced9c23cca9d572c89 (patch) | |
tree | 600c98aaeb7b0065d6cf3fe6715d5ccb1f245176 /bla.py | |
parent | a25069002d9933aa40a3172673d01c3ce9d3e8d3 (diff) |
Rename
Diffstat (limited to 'bla.py')
-rw-r--r-- | bla.py | 165 |
1 files changed, 0 insertions, 165 deletions
@@ -1,165 +0,0 @@ -import csv -from datetime import datetime -from decimal import Decimal -from itertools import groupby -import logging -from typing import Dict, List - -from kraken import read_ledger -from ledger_action import LedgerAction -from trade import Trade -from trade_queue import FIFOQueue - -logging.basicConfig( - level=logging.WARNING, format="%(asctime)s - %(levelname)s - %(name)s - %(message)s" -) - -# Set up a dedicated logger for FIFOQueue -logger = logging.getLogger("pnlcalc") - - -def generate_report(sale_entries, proceeds: Decimal, crypto_asset, date_sold): - report = [] - sell_date = datetime.strptime(date_sold, "%Y-%m-%d").strftime("%d.%m.%Y") - proceeds = Decimal(proceeds) - - trade: Trade - for trade in sale_entries: - buy_date_formatted = datetime.strptime(trade.date, "%Y-%m-%d").strftime( - "%d.%m.%Y" - ) - holding_period = ( - datetime.strptime(date_sold, "%Y-%m-%d") - - datetime.strptime(trade.date, "%Y-%m-%d") - ).days - short_or_long = "Short" if holding_period < 365 else "Long" - total_cost = trade.total_cost - gain_or_loss = proceeds - total_cost - - report.append( - { - "Amount": f"{trade.amount:.8f}", - "Currency": crypto_asset, - "Date Sold": sell_date, - "Date Acquired": buy_date_formatted, - "Short/Long": short_or_long, - "Buy/Input at": "Kraken", - "Sell/Output at": "Kraken", - "Proceeds": f"{proceeds:.2f}", - "Cost Basis": f"{total_cost:.2f}", - "Gain/Loss": f"{gain_or_loss:.2f}", - } - ) - - return report - - -fifo_queues: Dict[str, FIFOQueue] = {} # Separate FIFO queue per cryptocurrency -report = [] - - -def process_trade(refid: str, trades: List[LedgerAction]): - if len(trades) == 2: # Ensure we have two related rows (EUR + crypto) - eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None) - crypto_trade = next((trade for trade in trades if trade.asset != "EUR"), None) - - if eur_trade and crypto_trade: - crypto_asset = crypto_trade.asset - eur_amount = Decimal(eur_trade.amount) - eur_fee = Decimal(eur_trade.fee) - crypto_amount = Decimal(crypto_trade.amount) - crypto_fee = Decimal(crypto_trade.fee) - fifo_queues.setdefault(crypto_asset, FIFOQueue()) - - date_sold = eur_trade.date - - if eur_amount < 0: # Purchase of cryptocurrency - stake_amount = -eur_amount - eur_fee # Account for EUR fees - crypto_amount -= crypto_fee # Adjust for crypto fees - fifo_queues[crypto_asset].add_trade(Trade(crypto_amount, stake_amount, date_sold, refid=refid)) - elif eur_amount > 0: # Sale of cryptocurrency - proceeds = eur_amount - eur_fee - fifo_queues[crypto_asset].add_trade(Trade(crypto_amount, -proceeds, date_sold, refid=refid)) - # sale_entries = fifo_queues[crypto_asset].remove_coins(-crypto_amount) - # report.extend( - # generate_report(sale_entries, proceeds, crypto_asset, date_sold) - # ) - else: - raise ValueError(f"Unexpected trade grouping for refid {refid}") - else: - raise ValueError(f"Unexpected number of trades for refid {refid}") - - -def process_ledger(read_actions: List[LedgerAction], output_path: str): - # don't make any assumtptions about ledger sorting - # groupby requires sorted inputs - actions_sorted = sorted(read_actions, key=lambda a: (a.refid, a.timestamp)) - _grouped_actions = groupby(actions_sorted, lambda a: a.refid) - _list_tuple_actions = [(k, list(v)) for k, v in _grouped_actions] - # finally, sort groupy by first available timestamp - sorted_grouped_actions = sorted(_list_tuple_actions, key=lambda a: a[1][0].timestamp) - - for refid, actions in sorted_grouped_actions: - actions = list(actions) - - if len(actions) == 0: - logger.error("actions is empty") - continue - action = actions[0] - - # Group trades by refid - if action.type == "trade": - process_trade(refid, actions) - - elif action.type == "deposit" and action.asset != "EUR": - assert len(actions) == 1 - logger.error("Don't know how do handle deposits yet.") - # currency = action.asset - # fifo_queues.setdefault(currency, FIFOQueue()) - # amount = Decimal(action.amount) - # price = 0 - # current = fifo_queues[currency] - - # # remove transaction fees - # # (but only if it has a previous withdraw, or rather, if these are coins from a previous purchase... - # # but I can't check that...) - # if len(current): - # current.remove_coins(action.fee) - - # current.add(amount, price, action.date) - - elif action.type == "withdrawal" and action.asset != "EUR": - assert len(actions) == 1 - logger.error("Don't know how do handle withdrawals yet.") - # currency = action.asset - # fifo_queues.setdefault(currency, FIFOQueue()) - # amount = Decimal(action.amount) - # price = 0 # Deposits typically have no associated cost basis - # current = fifo_queues[currency] - - # current.add(amount, price, action.date) - - # Write report to CSV - with open(output_path, "w", newline="") as csvfile: - fieldnames = [ - "Amount", - "Currency", - "Date Sold", - "Date Acquired", - "Short/Long", - "Buy/Input at", - "Sell/Output at", - "Proceeds", - "Cost Basis", - "Gain/Loss", - ] - writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - writer.writeheader() - writer.writerows(report) - - -# Usage -ledger_path = "ledgers.csv" # Replace with your ledger file path -output_path = "tax_report.csv" # Replace with your desired output file path -actions = read_ledger(ledger_path) -process_ledger(actions, output_path) |