diff options
author | uvok | 2025-04-15 13:32:48 +0200 |
---|---|---|
committer | uvok | 2025-04-15 13:32:48 +0200 |
commit | 206c2fe18c322e94c3f0a18bee981c14b2a0cd56 (patch) | |
tree | b182a079b0e75a79ffdd6481400afdfc812801c2 | |
parent | 6e782e7368e5d66f05b5d9b2b3e36352ae0d7a77 (diff) |
Introduce LedgerAction
make code cleaner
-rw-r--r-- | bla.py | 107 | ||||
-rw-r--r-- | ledger_action.py | 12 |
2 files changed, 74 insertions, 45 deletions
@@ -5,6 +5,7 @@ from decimal import Decimal import logging from typing import Dict, List +from ledger_action import LedgerAction from trade import Trade from trade_queue import FIFOQueue @@ -37,55 +38,71 @@ def generate_report(sale_entries, proceeds: float|Decimal, crypto_asset, date_so return report -def process_ledger(file_path, output_path): - fifo_queues :Dict[str,FIFOQueue] = {} # Separate FIFO queue per cryptocurrency - trades_by_refid = defaultdict(list) - report = [] +def read_kraken_ledger(csv_path: str) -> List[LedgerAction]: + actions :List[LedgerAction] = [] - with open(file_path, 'r') as file: + with open(csv_path, 'r') as file: reader = csv.DictReader(file) for row in reader: - # Group trades by refid - if row["type"] == "trade": - trades_by_refid[row["refid"]].append(row) - - # Handle deposits - elif row["type"] == "deposit" and row["asset"] != "EUR": - currency = row["asset"] - fifo_queues.setdefault(currency, FIFOQueue()) - amount = Decimal(row["amount"]) - price = 0 # Deposits typically have no associated cost basis - date = row["time"].split(" ")[0] - fifo_queues[currency].add(amount, price, date) - - # Process grouped trades - for refid, trades in trades_by_refid.items(): - if len(trades) == 2: # Ensure we have two related rows (EUR + crypto) - eur_trade = next((trade for trade in trades if trade["asset"] == "EUR"), None) - crypto_trade = next((trade for trade in trades if trade["asset"] != "EUR"), None) - - if eur_trade and crypto_trade: - crypto_asset = crypto_trade["asset"] - eur_amount = Decimal(eur_trade["amount"]) - eur_fee = Decimal(eur_trade["fee"]) - crypto_amount = Decimal(crypto_trade["amount"]) - crypto_fee = Decimal(crypto_trade["fee"]) - fifo_queues.setdefault(crypto_asset, FIFOQueue()) - - date_sold = eur_trade["time"].split(" ")[0] - - if eur_amount < 0: # Purchase of cryptocurrency - stake_amount = -eur_amount - eur_fee # Account for EUR fees - crypto_amount -= crypto_fee # Adjust for crypto fees - fifo_queues[crypto_asset].add(crypto_amount, stake_amount, date_sold) - elif eur_amount > 0: # Sale of cryptocurrency - proceeds = eur_amount - eur_fee # Account for EUR fees - sale_entries = fifo_queues[crypto_asset].remove_coins(-crypto_amount) - report.extend(generate_report(sale_entries, proceeds, crypto_asset, date_sold)) - else: - raise ValueError(f"Unexpected trade grouping for refid {refid}") + date = row["time"].split(" ")[0] + actions.append(LedgerAction( + type=row["type"], + asset=row["asset"], + amount=Decimal(row["amount"]), + fee=Decimal(row.get("fee", "0")), + refid=row.get("refid", ""), + date=date + )) + + return actions + +def process_ledger(file_path :str, output_path :str): + fifo_queues :Dict[str,FIFOQueue] = {} # Separate FIFO queue per cryptocurrency + trades_by_refid :Dict[str, list[LedgerAction]] = defaultdict(list) + report = [] + actions = read_kraken_ledger(file_path) + + for action in actions: + # Group trades by refid + if action.type == "trade": + trades_by_refid[action.refid].append(action) + + # Handle deposits + elif action.type == "deposit" and action.asset != "EUR": + currency = action.asset + fifo_queues.setdefault(currency, FIFOQueue()) + amount = Decimal(action.amount) + price = 0 # Deposits typically have no associated cost basis + fifo_queues[currency].add(amount, price, action.date) + + # Process grouped trades + for refid, trades in trades_by_refid.items(): + if len(trades) == 2: # Ensure we have two related rows (EUR + crypto) + eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None) + crypto_trade = next((trade for trade in trades if trade.asset != "EUR"), None) + + if eur_trade and crypto_trade: + crypto_asset = crypto_trade.asset + eur_amount = Decimal(eur_trade.amount) + eur_fee = Decimal(eur_trade.fee) + crypto_amount = Decimal(crypto_trade.amount) + crypto_fee = Decimal(crypto_trade.fee) + fifo_queues.setdefault(crypto_asset, FIFOQueue()) + + date_sold = eur_trade.date + + if eur_amount < 0: # Purchase of cryptocurrency + stake_amount = -eur_amount - eur_fee # Account for EUR fees + crypto_amount -= crypto_fee # Adjust for crypto fees + fifo_queues[crypto_asset].add(crypto_amount, stake_amount, date_sold) + elif eur_amount > 0: # Sale of cryptocurrency + proceeds = eur_amount - eur_fee # Account for EUR fees + sale_entries = fifo_queues[crypto_asset].remove_coins(-crypto_amount) + report.extend(generate_report(sale_entries, proceeds, crypto_asset, date_sold)) else: - raise ValueError(f"Unexpected number of trades for refid {refid}") + raise ValueError(f"Unexpected trade grouping for refid {refid}") + else: + raise ValueError(f"Unexpected number of trades for refid {refid}") # Write report to CSV with open(output_path, 'w', newline='') as csvfile: diff --git a/ledger_action.py b/ledger_action.py new file mode 100644 index 0000000..751d6b8 --- /dev/null +++ b/ledger_action.py @@ -0,0 +1,12 @@ +from dataclasses import dataclass +from decimal import Decimal +from typing import Optional + +@dataclass +class LedgerAction: + type: str + asset: str + amount: Decimal + fee: Decimal + date: str + refid: str |