summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ledger_process.py105
-rw-r--r--pnlcalc.py89
2 files changed, 111 insertions, 83 deletions
diff --git a/ledger_process.py b/ledger_process.py
new file mode 100644
index 0000000..7703e58
--- /dev/null
+++ b/ledger_process.py
@@ -0,0 +1,105 @@
+from decimal import Decimal
+from itertools import groupby
+import logging
+from typing import Dict, List
+
+from ledger_action import LedgerAction
+from trade import Trade
+from trade_queue import FIFOQueue
+
+logger = logging.getLogger(__name__)
+
+
+class LedgerProcess:
+ def __init__(self):
+ # Separate FIFO queue per cryptocurrency
+ self.fifo_queues: Dict[str, FIFOQueue] = {}
+ self.external_wallet: Dict[str, FIFOQueue] = {}
+
+ def process_ledger(self, read_actions: List[LedgerAction]):
+ # don't make any assumtptions about ledger sorting
+ # groupby requires sorted inputs
+ actions_sorted = sorted(read_actions, key=lambda a: (a.refid, a.timestamp))
+ _grouped_actions = groupby(actions_sorted, lambda a: a.refid)
+ _list_tuple_actions = [(k, list(v)) for k, v in _grouped_actions]
+ # finally, sort groupy by first available timestamp
+ sorted_grouped_actions = sorted(
+ _list_tuple_actions, key=lambda a: a[1][0].timestamp
+ )
+
+ for refid, actions in sorted_grouped_actions:
+ actions = list(actions)
+
+ if len(actions) == 0:
+ logger.error("actions is empty")
+ continue
+ action = actions[0]
+
+ # Group trades by refid
+ if action.type == "trade":
+ self._process_trade(refid, actions)
+
+ elif action.type == "deposit" and action.asset != "EUR":
+ assert len(actions) == 1
+ logger.error("Don't know how do handle deposits yet.")
+ # currency = action.asset
+ # fifo_queues.setdefault(currency, FIFOQueue())
+ # amount = Decimal(action.amount)
+ # price = 0
+ # current = fifo_queues[currency]
+
+ # # remove transaction fees
+ # # (but only if it has a previous withdraw, or rather, if these are coins from a previous purchase...
+ # # but I can't check that...)
+ # if len(current):
+ # current.remove_coins(action.fee)
+
+ # current.add(amount, price, action.date)
+
+ elif action.type == "withdrawal" and action.asset != "EUR":
+ assert len(actions) == 1
+ logger.error("Don't know how do handle withdrawals yet.")
+ # currency = action.asset
+ # fifo_queues.setdefault(currency, FIFOQueue())
+ # amount = Decimal(action.amount)
+ # price = 0 # Deposits typically have no associated cost basis
+ # current = fifo_queues[currency]
+
+ # current.add(amount, price, action.date)
+
+ def _process_trade(self, refid: str, trades: List[LedgerAction]):
+ if len(trades) == 2: # Ensure we have two related rows (EUR + crypto)
+ eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None)
+ crypto_trade = next(
+ (trade for trade in trades if trade.asset != "EUR"), None
+ )
+
+ if eur_trade and crypto_trade:
+ crypto_asset = crypto_trade.asset
+ eur_amount = Decimal(eur_trade.amount)
+ eur_fee = Decimal(eur_trade.fee)
+ crypto_amount = Decimal(crypto_trade.amount)
+ crypto_fee = Decimal(crypto_trade.fee)
+ self.fifo_queues.setdefault(crypto_asset, FIFOQueue())
+
+ date_sold = eur_trade.date
+
+ if eur_amount < 0: # Purchase of cryptocurrency
+ stake_amount = -eur_amount - eur_fee # Account for EUR fees
+ crypto_amount -= crypto_fee # Adjust for crypto fees
+ self.fifo_queues[crypto_asset].add_trade(
+ Trade(crypto_amount, stake_amount, date_sold, refid=refid)
+ )
+ elif eur_amount > 0: # Sale of cryptocurrency
+ proceeds = eur_amount - eur_fee
+ self.fifo_queues[crypto_asset].add_trade(
+ Trade(crypto_amount, -proceeds, date_sold, refid=refid)
+ )
+ # sale_entries = fifo_queues[crypto_asset].remove_coins(-crypto_amount)
+ # report.extend(
+ # generate_report(sale_entries, proceeds, crypto_asset, date_sold)
+ # )
+ else:
+ raise ValueError(f"Unexpected trade grouping for refid {refid}")
+ else:
+ raise ValueError(f"Unexpected number of trades for refid {refid}")
diff --git a/pnlcalc.py b/pnlcalc.py
index 309a0b0..51152c2 100644
--- a/pnlcalc.py
+++ b/pnlcalc.py
@@ -7,6 +7,7 @@ from typing import Dict, List
from kraken import read_ledger
from ledger_action import LedgerAction
+from ledger_process import LedgerProcess
from trade import Trade
from trade_queue import FIFOQueue
@@ -54,91 +55,10 @@ def generate_report(sale_entries, proceeds: Decimal, crypto_asset, date_sold):
return report
-fifo_queues: Dict[str, FIFOQueue] = {} # Separate FIFO queue per cryptocurrency
report = []
-def process_trade(refid: str, trades: List[LedgerAction]):
- if len(trades) == 2: # Ensure we have two related rows (EUR + crypto)
- eur_trade = next((trade for trade in trades if trade.asset == "EUR"), None)
- crypto_trade = next((trade for trade in trades if trade.asset != "EUR"), None)
-
- if eur_trade and crypto_trade:
- crypto_asset = crypto_trade.asset
- eur_amount = Decimal(eur_trade.amount)
- eur_fee = Decimal(eur_trade.fee)
- crypto_amount = Decimal(crypto_trade.amount)
- crypto_fee = Decimal(crypto_trade.fee)
- fifo_queues.setdefault(crypto_asset, FIFOQueue())
-
- date_sold = eur_trade.date
-
- if eur_amount < 0: # Purchase of cryptocurrency
- stake_amount = -eur_amount - eur_fee # Account for EUR fees
- crypto_amount -= crypto_fee # Adjust for crypto fees
- fifo_queues[crypto_asset].add_trade(Trade(crypto_amount, stake_amount, date_sold, refid=refid))
- elif eur_amount > 0: # Sale of cryptocurrency
- proceeds = eur_amount - eur_fee
- fifo_queues[crypto_asset].add_trade(Trade(crypto_amount, -proceeds, date_sold, refid=refid))
- # sale_entries = fifo_queues[crypto_asset].remove_coins(-crypto_amount)
- # report.extend(
- # generate_report(sale_entries, proceeds, crypto_asset, date_sold)
- # )
- else:
- raise ValueError(f"Unexpected trade grouping for refid {refid}")
- else:
- raise ValueError(f"Unexpected number of trades for refid {refid}")
-
-
-def process_ledger(read_actions: List[LedgerAction], output_path: str):
- # don't make any assumtptions about ledger sorting
- # groupby requires sorted inputs
- actions_sorted = sorted(read_actions, key=lambda a: (a.refid, a.timestamp))
- _grouped_actions = groupby(actions_sorted, lambda a: a.refid)
- _list_tuple_actions = [(k, list(v)) for k, v in _grouped_actions]
- # finally, sort groupy by first available timestamp
- sorted_grouped_actions = sorted(_list_tuple_actions, key=lambda a: a[1][0].timestamp)
-
- for refid, actions in sorted_grouped_actions:
- actions = list(actions)
-
- if len(actions) == 0:
- logger.error("actions is empty")
- continue
- action = actions[0]
-
- # Group trades by refid
- if action.type == "trade":
- process_trade(refid, actions)
-
- elif action.type == "deposit" and action.asset != "EUR":
- assert len(actions) == 1
- logger.error("Don't know how do handle deposits yet.")
- # currency = action.asset
- # fifo_queues.setdefault(currency, FIFOQueue())
- # amount = Decimal(action.amount)
- # price = 0
- # current = fifo_queues[currency]
-
- # # remove transaction fees
- # # (but only if it has a previous withdraw, or rather, if these are coins from a previous purchase...
- # # but I can't check that...)
- # if len(current):
- # current.remove_coins(action.fee)
-
- # current.add(amount, price, action.date)
-
- elif action.type == "withdrawal" and action.asset != "EUR":
- assert len(actions) == 1
- logger.error("Don't know how do handle withdrawals yet.")
- # currency = action.asset
- # fifo_queues.setdefault(currency, FIFOQueue())
- # amount = Decimal(action.amount)
- # price = 0 # Deposits typically have no associated cost basis
- # current = fifo_queues[currency]
-
- # current.add(amount, price, action.date)
-
+def write_report(output_path: str):
# Write report to CSV
with open(output_path, "w", newline="") as csvfile:
fieldnames = [
@@ -162,4 +82,7 @@ def process_ledger(read_actions: List[LedgerAction], output_path: str):
ledger_path = "ledgers.csv" # Replace with your ledger file path
output_path = "tax_report.csv" # Replace with your desired output file path
actions = read_ledger(ledger_path)
-process_ledger(actions, output_path)
+lp = LedgerProcess()
+lp.process_ledger(actions)
+pass
+# write_report(output_path)