summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bla.py174
1 files changed, 109 insertions, 65 deletions
diff --git a/bla.py b/bla.py
index 4fc3475..979271f 100644
--- a/bla.py
+++ b/bla.py
@@ -2,6 +2,7 @@ import csv
from collections import defaultdict
from datetime import datetime
from decimal import Decimal
+from itertools import groupby
import logging
from typing import Dict, List
@@ -9,103 +10,144 @@ from ledger_action import LedgerAction
from trade import Trade
from trade_queue import FIFOQueue
+# Set up a dedicated logger for FIFOQueue
+logger = logging.getLogger(__name__)
+logger.setLevel(logging.DEBUG)
-def generate_report(sale_entries, proceeds: float|Decimal, crypto_asset, date_sold):
+
+def generate_report(sale_entries, proceeds: float | Decimal, crypto_asset, date_sold):
report = []
sell_date = datetime.strptime(date_sold, "%Y-%m-%d").strftime("%d.%m.%Y")
proceeds = Decimal(proceeds)
trade: Trade
for trade in sale_entries:
- buy_date_formatted = datetime.strptime(trade.date, "%Y-%m-%d").strftime("%d.%m.%Y")
- holding_period = (datetime.strptime(date_sold, "%Y-%m-%d") - datetime.strptime(trade.date, "%Y-%m-%d")).days
+ buy_date_formatted = datetime.strptime(trade.date, "%Y-%m-%d").strftime(
+ "%d.%m.%Y"
+ )
+ holding_period = (
+ datetime.strptime(date_sold, "%Y-%m-%d")
+ - datetime.strptime(trade.date, "%Y-%m-%d")
+ ).days
short_or_long = "Short" if holding_period < 365 else "Long"
total_cost = trade.total_cost
gain_or_loss = proceeds - total_cost
- report.append({
- "Amount": f"{trade.amount:.8f}",
- "Currency": crypto_asset,
- "Date Sold": sell_date,
- "Date Acquired": buy_date_formatted,
- "Short/Long": short_or_long,
- "Buy/Input at": "Kraken",
- "Sell/Output at": "Kraken",
- "Proceeds": f"{proceeds:.2f}",
- "Cost Basis": f"{total_cost:.2f}",
- "Gain/Loss": f"{gain_or_loss:.2f}",
- })
+ report.append(
+ {
+ "Amount": f"{trade.amount:.8f}",
+ "Currency": crypto_asset,
+ "Date Sold": sell_date,
+ "Date Acquired": buy_date_formatted,
+ "Short/Long": short_or_long,
+ "Buy/Input at": "Kraken",
+ "Sell/Output at": "Kraken",
+ "Proceeds": f"{proceeds:.2f}",
+ "Cost Basis": f"{total_cost:.2f}",
+ "Gain/Loss": f"{gain_or_loss:.2f}",
+ }
+ )
return report
+
def read_kraken_ledger(csv_path: str) -> List[LedgerAction]:
- actions :List[LedgerAction] = []
+ actions: List[LedgerAction] = []
- with open(csv_path, 'r') as file:
+ with open(csv_path, "r") as file:
reader = csv.DictReader(file)
for row in reader:
date = row["time"].split(" ")[0]
- actions.append(LedgerAction(
- type=row["type"],
- asset=row["asset"],
- amount=Decimal(row["amount"]),
- fee=Decimal(row.get("fee", "0")),
- refid=row.get("refid", ""),
- date=date
- ))
+ actions.append(
+ LedgerAction(
+ type=row["type"],
+ asset=row["asset"],
+ amount=Decimal(row["amount"]),
+ fee=Decimal(row.get("fee", "0")),
+ refid=row.get("refid", ""),
+ date=date,
+ )
+ )
return actions
-def process_ledger(file_path :str, output_path :str):
- fifo_queues :Dict[str,FIFOQueue] = {} # Separate FIFO queue per cryptocurrency
- trades_by_refid :Dict[str, list[LedgerAction]] = defaultdict(list)
- report = []
- actions = read_kraken_ledger(file_path)
- for action in actions:
+fifo_queues: Dict[str, FIFOQueue] = {} # Separate FIFO queue per cryptocurrency
+report = []
+
+
+def process_trade(refid: str, trades: List[LedgerAction]):
+ if len(trades) == 2: # Ensure we have two related rows (EUR + crypto)
+ eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None)
+ crypto_trade = next((trade for trade in trades if trade.asset != "EUR"), None)
+
+ if eur_trade and crypto_trade:
+ crypto_asset = crypto_trade.asset
+ eur_amount = Decimal(eur_trade.amount)
+ eur_fee = Decimal(eur_trade.fee)
+ crypto_amount = Decimal(crypto_trade.amount)
+ crypto_fee = Decimal(crypto_trade.fee)
+ fifo_queues.setdefault(crypto_asset, FIFOQueue())
+
+ date_sold = eur_trade.date
+
+ if eur_amount < 0: # Purchase of cryptocurrency
+ stake_amount = -eur_amount - eur_fee # Account for EUR fees
+ crypto_amount -= crypto_fee # Adjust for crypto fees
+ fifo_queues[crypto_asset].add(crypto_amount, stake_amount, date_sold)
+ elif eur_amount > 0: # Sale of cryptocurrency
+ proceeds = eur_amount - eur_fee # Account for EUR fees
+ sale_entries = fifo_queues[crypto_asset].remove_coins(-crypto_amount)
+ report.extend(
+ generate_report(sale_entries, proceeds, crypto_asset, date_sold)
+ )
+ else:
+ raise ValueError(f"Unexpected trade grouping for refid {refid}")
+ else:
+ raise ValueError(f"Unexpected number of trades for refid {refid}")
+
+
+def process_ledger(read_actions: List[LedgerAction], output_path: str):
+ for refid, actions in groupby(
+ sorted(read_actions, key=lambda a: a.refid), lambda a: a.refid
+ ):
+ actions = list(actions)
+
+ if len(actions) == 0:
+ logger.error("actions is empty")
+ continue
+ action = actions[0]
+
# Group trades by refid
if action.type == "trade":
- trades_by_refid[action.refid].append(action)
+ process_trade(refid, actions)
- # Handle deposits
elif action.type == "deposit" and action.asset != "EUR":
+ assert len(actions) == 1
+ currency = action.asset
+ fifo_queues.setdefault(currency, FIFOQueue())
+ amount = Decimal(action.amount)
+ price = 0
+ current = fifo_queues[currency]
+
+ # remove transaction fees
+ # (but only if it has a previous withdraw, or rather, if these are coins from a previous purchase...
+ # but I can't check that...)
+ if len(current):
+ current.remove_coins(action.fee)
+
+ current.add(amount, price, action.date)
+
+ elif action.type == "withdrawal" and action.asset != "EUR":
+ assert len(actions) == 1
currency = action.asset
fifo_queues.setdefault(currency, FIFOQueue())
amount = Decimal(action.amount)
price = 0 # Deposits typically have no associated cost basis
fifo_queues[currency].add(amount, price, action.date)
- # Process grouped trades
- for refid, trades in trades_by_refid.items():
- if len(trades) == 2: # Ensure we have two related rows (EUR + crypto)
- eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None)
- crypto_trade = next((trade for trade in trades if trade.asset != "EUR"), None)
-
- if eur_trade and crypto_trade:
- crypto_asset = crypto_trade.asset
- eur_amount = Decimal(eur_trade.amount)
- eur_fee = Decimal(eur_trade.fee)
- crypto_amount = Decimal(crypto_trade.amount)
- crypto_fee = Decimal(crypto_trade.fee)
- fifo_queues.setdefault(crypto_asset, FIFOQueue())
-
- date_sold = eur_trade.date
-
- if eur_amount < 0: # Purchase of cryptocurrency
- stake_amount = -eur_amount - eur_fee # Account for EUR fees
- crypto_amount -= crypto_fee # Adjust for crypto fees
- fifo_queues[crypto_asset].add(crypto_amount, stake_amount, date_sold)
- elif eur_amount > 0: # Sale of cryptocurrency
- proceeds = eur_amount - eur_fee # Account for EUR fees
- sale_entries = fifo_queues[crypto_asset].remove_coins(-crypto_amount)
- report.extend(generate_report(sale_entries, proceeds, crypto_asset, date_sold))
- else:
- raise ValueError(f"Unexpected trade grouping for refid {refid}")
- else:
- raise ValueError(f"Unexpected number of trades for refid {refid}")
-
# Write report to CSV
- with open(output_path, 'w', newline='') as csvfile:
+ with open(output_path, "w", newline="") as csvfile:
fieldnames = [
"Amount",
"Currency",
@@ -116,15 +158,17 @@ def process_ledger(file_path :str, output_path :str):
"Sell/Output at",
"Proceeds",
"Cost Basis",
- "Gain/Loss"
+ "Gain/Loss",
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(report)
+
logging.basicConfig(level=logging.DEBUG)
# Usage
ledger_path = "kraken_ledger.csv" # Replace with your ledger file path
output_path = "tax_report.csv" # Replace with your desired output file path
-process_ledger(ledger_path, output_path)
+actions = read_kraken_ledger(ledger_path)
+process_ledger(actions, output_path)