summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoruvok2025-04-15 13:32:48 +0200
committeruvok2025-04-15 13:32:48 +0200
commit206c2fe18c322e94c3f0a18bee981c14b2a0cd56 (patch)
treeb182a079b0e75a79ffdd6481400afdfc812801c2
parent6e782e7368e5d66f05b5d9b2b3e36352ae0d7a77 (diff)
Introduce LedgerAction
make code cleaner
-rw-r--r--bla.py107
-rw-r--r--ledger_action.py12
2 files changed, 74 insertions, 45 deletions
diff --git a/bla.py b/bla.py
index ac9c60e..4fc3475 100644
--- a/bla.py
+++ b/bla.py
@@ -5,6 +5,7 @@ from decimal import Decimal
import logging
from typing import Dict, List
+from ledger_action import LedgerAction
from trade import Trade
from trade_queue import FIFOQueue
@@ -37,55 +38,71 @@ def generate_report(sale_entries, proceeds: float|Decimal, crypto_asset, date_so
return report
-def process_ledger(file_path, output_path):
- fifo_queues :Dict[str,FIFOQueue] = {} # Separate FIFO queue per cryptocurrency
- trades_by_refid = defaultdict(list)
- report = []
+def read_kraken_ledger(csv_path: str) -> List[LedgerAction]:
+ actions :List[LedgerAction] = []
- with open(file_path, 'r') as file:
+ with open(csv_path, 'r') as file:
reader = csv.DictReader(file)
for row in reader:
- # Group trades by refid
- if row["type"] == "trade":
- trades_by_refid[row["refid"]].append(row)
-
- # Handle deposits
- elif row["type"] == "deposit" and row["asset"] != "EUR":
- currency = row["asset"]
- fifo_queues.setdefault(currency, FIFOQueue())
- amount = Decimal(row["amount"])
- price = 0 # Deposits typically have no associated cost basis
- date = row["time"].split(" ")[0]
- fifo_queues[currency].add(amount, price, date)
-
- # Process grouped trades
- for refid, trades in trades_by_refid.items():
- if len(trades) == 2: # Ensure we have two related rows (EUR + crypto)
- eur_trade = next((trade for trade in trades if trade["asset"] == "EUR"), None)
- crypto_trade = next((trade for trade in trades if trade["asset"] != "EUR"), None)
-
- if eur_trade and crypto_trade:
- crypto_asset = crypto_trade["asset"]
- eur_amount = Decimal(eur_trade["amount"])
- eur_fee = Decimal(eur_trade["fee"])
- crypto_amount = Decimal(crypto_trade["amount"])
- crypto_fee = Decimal(crypto_trade["fee"])
- fifo_queues.setdefault(crypto_asset, FIFOQueue())
-
- date_sold = eur_trade["time"].split(" ")[0]
-
- if eur_amount < 0: # Purchase of cryptocurrency
- stake_amount = -eur_amount - eur_fee # Account for EUR fees
- crypto_amount -= crypto_fee # Adjust for crypto fees
- fifo_queues[crypto_asset].add(crypto_amount, stake_amount, date_sold)
- elif eur_amount > 0: # Sale of cryptocurrency
- proceeds = eur_amount - eur_fee # Account for EUR fees
- sale_entries = fifo_queues[crypto_asset].remove_coins(-crypto_amount)
- report.extend(generate_report(sale_entries, proceeds, crypto_asset, date_sold))
- else:
- raise ValueError(f"Unexpected trade grouping for refid {refid}")
+ date = row["time"].split(" ")[0]
+ actions.append(LedgerAction(
+ type=row["type"],
+ asset=row["asset"],
+ amount=Decimal(row["amount"]),
+ fee=Decimal(row.get("fee", "0")),
+ refid=row.get("refid", ""),
+ date=date
+ ))
+
+ return actions
+
+def process_ledger(file_path :str, output_path :str):
+ fifo_queues :Dict[str,FIFOQueue] = {} # Separate FIFO queue per cryptocurrency
+ trades_by_refid :Dict[str, list[LedgerAction]] = defaultdict(list)
+ report = []
+ actions = read_kraken_ledger(file_path)
+
+ for action in actions:
+ # Group trades by refid
+ if action.type == "trade":
+ trades_by_refid[action.refid].append(action)
+
+ # Handle deposits
+ elif action.type == "deposit" and action.asset != "EUR":
+ currency = action.asset
+ fifo_queues.setdefault(currency, FIFOQueue())
+ amount = Decimal(action.amount)
+ price = 0 # Deposits typically have no associated cost basis
+ fifo_queues[currency].add(amount, price, action.date)
+
+ # Process grouped trades
+ for refid, trades in trades_by_refid.items():
+ if len(trades) == 2: # Ensure we have two related rows (EUR + crypto)
+ eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None)
+ crypto_trade = next((trade for trade in trades if trade.asset != "EUR"), None)
+
+ if eur_trade and crypto_trade:
+ crypto_asset = crypto_trade.asset
+ eur_amount = Decimal(eur_trade.amount)
+ eur_fee = Decimal(eur_trade.fee)
+ crypto_amount = Decimal(crypto_trade.amount)
+ crypto_fee = Decimal(crypto_trade.fee)
+ fifo_queues.setdefault(crypto_asset, FIFOQueue())
+
+ date_sold = eur_trade.date
+
+ if eur_amount < 0: # Purchase of cryptocurrency
+ stake_amount = -eur_amount - eur_fee # Account for EUR fees
+ crypto_amount -= crypto_fee # Adjust for crypto fees
+ fifo_queues[crypto_asset].add(crypto_amount, stake_amount, date_sold)
+ elif eur_amount > 0: # Sale of cryptocurrency
+ proceeds = eur_amount - eur_fee # Account for EUR fees
+ sale_entries = fifo_queues[crypto_asset].remove_coins(-crypto_amount)
+ report.extend(generate_report(sale_entries, proceeds, crypto_asset, date_sold))
else:
- raise ValueError(f"Unexpected number of trades for refid {refid}")
+ raise ValueError(f"Unexpected trade grouping for refid {refid}")
+ else:
+ raise ValueError(f"Unexpected number of trades for refid {refid}")
# Write report to CSV
with open(output_path, 'w', newline='') as csvfile:
diff --git a/ledger_action.py b/ledger_action.py
new file mode 100644
index 0000000..751d6b8
--- /dev/null
+++ b/ledger_action.py
@@ -0,0 +1,12 @@
+from dataclasses import dataclass
+from decimal import Decimal
+from typing import Optional
+
+@dataclass
+class LedgerAction:
+ type: str
+ asset: str
+ amount: Decimal
+ fee: Decimal
+ date: str
+ refid: str