summaryrefslogtreecommitdiff
path: root/bla.py
diff options
context:
space:
mode:
authoruvok2025-04-18 11:35:41 +0200
committeruvok2025-04-18 11:35:41 +0200
commitd80b01d7425a2386612d84ced9c23cca9d572c89 (patch)
tree600c98aaeb7b0065d6cf3fe6715d5ccb1f245176 /bla.py
parenta25069002d9933aa40a3172673d01c3ce9d3e8d3 (diff)
Rename
Diffstat (limited to 'bla.py')
-rw-r--r--bla.py165
1 files changed, 0 insertions, 165 deletions
diff --git a/bla.py b/bla.py
deleted file mode 100644
index 309a0b0..0000000
--- a/bla.py
+++ /dev/null
@@ -1,165 +0,0 @@
-import csv
-from datetime import datetime
-from decimal import Decimal
-from itertools import groupby
-import logging
-from typing import Dict, List
-
-from kraken import read_ledger
-from ledger_action import LedgerAction
-from trade import Trade
-from trade_queue import FIFOQueue
-
-logging.basicConfig(
- level=logging.WARNING, format="%(asctime)s - %(levelname)s - %(name)s - %(message)s"
-)
-
-# Set up a dedicated logger for FIFOQueue
-logger = logging.getLogger("pnlcalc")
-
-
-def generate_report(sale_entries, proceeds: Decimal, crypto_asset, date_sold):
- report = []
- sell_date = datetime.strptime(date_sold, "%Y-%m-%d").strftime("%d.%m.%Y")
- proceeds = Decimal(proceeds)
-
- trade: Trade
- for trade in sale_entries:
- buy_date_formatted = datetime.strptime(trade.date, "%Y-%m-%d").strftime(
- "%d.%m.%Y"
- )
- holding_period = (
- datetime.strptime(date_sold, "%Y-%m-%d")
- - datetime.strptime(trade.date, "%Y-%m-%d")
- ).days
- short_or_long = "Short" if holding_period < 365 else "Long"
- total_cost = trade.total_cost
- gain_or_loss = proceeds - total_cost
-
- report.append(
- {
- "Amount": f"{trade.amount:.8f}",
- "Currency": crypto_asset,
- "Date Sold": sell_date,
- "Date Acquired": buy_date_formatted,
- "Short/Long": short_or_long,
- "Buy/Input at": "Kraken",
- "Sell/Output at": "Kraken",
- "Proceeds": f"{proceeds:.2f}",
- "Cost Basis": f"{total_cost:.2f}",
- "Gain/Loss": f"{gain_or_loss:.2f}",
- }
- )
-
- return report
-
-
-fifo_queues: Dict[str, FIFOQueue] = {} # Separate FIFO queue per cryptocurrency
-report = []
-
-
-def process_trade(refid: str, trades: List[LedgerAction]):
- if len(trades) == 2: # Ensure we have two related rows (EUR + crypto)
- eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None)
- crypto_trade = next((trade for trade in trades if trade.asset != "EUR"), None)
-
- if eur_trade and crypto_trade:
- crypto_asset = crypto_trade.asset
- eur_amount = Decimal(eur_trade.amount)
- eur_fee = Decimal(eur_trade.fee)
- crypto_amount = Decimal(crypto_trade.amount)
- crypto_fee = Decimal(crypto_trade.fee)
- fifo_queues.setdefault(crypto_asset, FIFOQueue())
-
- date_sold = eur_trade.date
-
- if eur_amount < 0: # Purchase of cryptocurrency
- stake_amount = -eur_amount - eur_fee # Account for EUR fees
- crypto_amount -= crypto_fee # Adjust for crypto fees
- fifo_queues[crypto_asset].add_trade(Trade(crypto_amount, stake_amount, date_sold, refid=refid))
- elif eur_amount > 0: # Sale of cryptocurrency
- proceeds = eur_amount - eur_fee
- fifo_queues[crypto_asset].add_trade(Trade(crypto_amount, -proceeds, date_sold, refid=refid))
- # sale_entries = fifo_queues[crypto_asset].remove_coins(-crypto_amount)
- # report.extend(
- # generate_report(sale_entries, proceeds, crypto_asset, date_sold)
- # )
- else:
- raise ValueError(f"Unexpected trade grouping for refid {refid}")
- else:
- raise ValueError(f"Unexpected number of trades for refid {refid}")
-
-
-def process_ledger(read_actions: List[LedgerAction], output_path: str):
- # don't make any assumtptions about ledger sorting
- # groupby requires sorted inputs
- actions_sorted = sorted(read_actions, key=lambda a: (a.refid, a.timestamp))
- _grouped_actions = groupby(actions_sorted, lambda a: a.refid)
- _list_tuple_actions = [(k, list(v)) for k, v in _grouped_actions]
- # finally, sort groupy by first available timestamp
- sorted_grouped_actions = sorted(_list_tuple_actions, key=lambda a: a[1][0].timestamp)
-
- for refid, actions in sorted_grouped_actions:
- actions = list(actions)
-
- if len(actions) == 0:
- logger.error("actions is empty")
- continue
- action = actions[0]
-
- # Group trades by refid
- if action.type == "trade":
- process_trade(refid, actions)
-
- elif action.type == "deposit" and action.asset != "EUR":
- assert len(actions) == 1
- logger.error("Don't know how do handle deposits yet.")
- # currency = action.asset
- # fifo_queues.setdefault(currency, FIFOQueue())
- # amount = Decimal(action.amount)
- # price = 0
- # current = fifo_queues[currency]
-
- # # remove transaction fees
- # # (but only if it has a previous withdraw, or rather, if these are coins from a previous purchase...
- # # but I can't check that...)
- # if len(current):
- # current.remove_coins(action.fee)
-
- # current.add(amount, price, action.date)
-
- elif action.type == "withdrawal" and action.asset != "EUR":
- assert len(actions) == 1
- logger.error("Don't know how do handle withdrawals yet.")
- # currency = action.asset
- # fifo_queues.setdefault(currency, FIFOQueue())
- # amount = Decimal(action.amount)
- # price = 0 # Deposits typically have no associated cost basis
- # current = fifo_queues[currency]
-
- # current.add(amount, price, action.date)
-
- # Write report to CSV
- with open(output_path, "w", newline="") as csvfile:
- fieldnames = [
- "Amount",
- "Currency",
- "Date Sold",
- "Date Acquired",
- "Short/Long",
- "Buy/Input at",
- "Sell/Output at",
- "Proceeds",
- "Cost Basis",
- "Gain/Loss",
- ]
- writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
- writer.writeheader()
- writer.writerows(report)
-
-
-# Usage
-ledger_path = "ledgers.csv" # Replace with your ledger file path
-output_path = "tax_report.csv" # Replace with your desired output file path
-actions = read_ledger(ledger_path)
-process_ledger(actions, output_path)