From cd7c102ca59d2c59065e5b369ebc862104ad1607 Mon Sep 17 00:00:00 2001 From: uvok Date: Mon, 20 Jan 2025 18:43:25 +0100 Subject: Add threading with fetcher loop --- hello-fusepy.py | 71 ++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 52 insertions(+), 19 deletions(-) diff --git a/hello-fusepy.py b/hello-fusepy.py index fdeec00..a02bd9f 100644 --- a/hello-fusepy.py +++ b/hello-fusepy.py @@ -2,9 +2,12 @@ import argparse import enum import errno import logging +import time import requests import sys import stat +from threading import Thread, Lock +from queue import Queue from datetime import datetime as dtp from fuse import ( @@ -18,6 +21,9 @@ from fuse import ( import urllib.parse +logger = logging.getLogger(__name__) + + class APIChoice(enum.Enum): ACTIVITYPUB = "ActivityPub" MASTODON = "Mastodon" @@ -130,14 +136,13 @@ class MastodonStatusProvider(StatusProvider): class StatusFileSystem(Operations, LoggingMixIn): - def __init__(self, api: APIChoice, server: str, user: str): - self.statuses: list[Status] = [] + def __init__(self): + self._lock = Lock() + + with self._lock: + self.statuses: list[Status] = [] + self.fd = 0 - self.api = api - if api == APIChoice.MASTODON: - self.status_provider = MastodonStatusProvider(server, user) - else: - self.status_provider = ActivityPubStatusProvider(server, user) def getattr(self, path, fh=None): (uid, gid, _) = fuse_get_context() @@ -148,7 +153,9 @@ class StatusFileSystem(Operations, LoggingMixIn): "st_uid": uid, "st_gid": gid, } - found = next((s for s in self.statuses if s.id == path[1:]), None) + with self._lock: + found = next((s for s in self.statuses if s.id == path[1:]), None) + if found: published_dt = dtp.fromisoformat(found.published) pubunix = published_dt.timestamp() @@ -164,24 +171,29 @@ class StatusFileSystem(Operations, LoggingMixIn): raise FuseOSError(errno.ENOENT) def list_dir(self) -> list[str]: - return [s.id for s in self.statuses] + with self._lock: + return [s.id for s in self.statuses] def readdir(self, path, fh): dir_entries = [] if path != "/": raise FuseOSError(errno.ENOENT) dir_entries = [".", ".."] - if not self.statuses: - self.statuses = self.status_provider.load_statuses() dir_entries += self.list_dir() return dir_entries + def add_statuses(self, statuses: list[Status]): + with self._lock: + self.statuses.extend(statuses) + def open(self, path, flags): self.fd += 1 return self.fd def read(self, path, size, offset, fh): - found = next(s for s in self.statuses if s.id == path[1:]) + with self._lock: + found = next(s for s in self.statuses if s.id == path[1:]) + if found: return found.content.encode("utf8") raise FuseOSError(errno.ENOENT) @@ -215,21 +227,42 @@ def parse_arguments(): return args +# todo: make this a loop supporting paging +def status_fetcher(fs: StatusFileSystem, sp: StatusProvider): + logger.debug("Waiting to fetch statuses.") + time.sleep(5) + logger.debug("Fetch statuses.") + st = sp.load_statuses() + logger.debug("Add statuses to FS.") + fs.add_statuses(st) + logger.debug("Wait...") + time.sleep(5) + logger.debug("Done.") + def main(args): try: - FUSE( - StatusFileSystem(args.api_choice, args.server, args.username), - args.mountpoint, - nothreads=True, - foreground=True, - ) + if args.api_choice == APIChoice.MASTODON: + status_provider = MastodonStatusProvider(args.server, args.username) + else: + status_provider = ActivityPubStatusProvider(args.server, args.username) + + myfs = StatusFileSystem() + + t = Thread(target=status_fetcher, args=(myfs, status_provider)) + t.start() + + f = FUSE(myfs, args.mountpoint, nothreads=True, foreground=True) except: fuse_exit() raise + finally: + # q.join() + t.join() if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG) + logging.basicConfig(level=logging.INFO) + logger.setLevel(logging.DEBUG) args = parse_arguments() main(args) -- cgit v1.2.3 From 35891f6eaf8e5eb923cef5a2eb57afb503937936 Mon Sep 17 00:00:00 2001 From: uvok Date: Tue, 21 Jan 2025 20:26:58 +0100 Subject: Handle paging, thread quit --- hello-fusepy.py | 71 ++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 45 insertions(+), 26 deletions(-) diff --git a/hello-fusepy.py b/hello-fusepy.py index a02bd9f..1db302b 100644 --- a/hello-fusepy.py +++ b/hello-fusepy.py @@ -6,7 +6,9 @@ import time import requests import sys import stat -from threading import Thread, Lock + +from typing import Optional +from threading import Thread, Lock, Event from queue import Queue from datetime import datetime as dtp @@ -42,7 +44,7 @@ class Status(object): class StatusProvider: - def load_statuses(self) -> list[Status]: + def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: raise NotImplementedError def _fallback_not_found(self): @@ -57,24 +59,30 @@ class ActivityPubStatusProvider(StatusProvider): self.server = server self.user = user - def load_statuses(self) -> list[Status]: + def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: url = api_url_ap_template.format(server=self.server, user=self.user) + if max_id: + url += "&" + urllib.parse.urlencode({"max_id": max_id}) + logger.debug("Get AP status from %s", url) res = requests.get(url) if res.status_code == 404: - return self._fallback_not_found() + return self._fallback_not_found(), None try: res.raise_for_status() except requests.exceptions.RequestException as e: logging.error("Request error: %s", e) - return self._fallback_error(getattr(e, "message", str(e))) + return self._fallback_error(getattr(e, "message", str(e))), None stats = res.json() status_items = stats.get("orderedItems", None) if not status_items: - return self._fallback_error("Malformed content in querying AP outbox.") + return ( + self._fallback_error("Malformed content in querying AP outbox."), + None, + ) - return [ + ss = [ Status( s["object"]["id"].split("/")[-1], s["object"]["content"], @@ -86,6 +94,7 @@ class ActivityPubStatusProvider(StatusProvider): and all(key in s["object"] for key in ["id", "content", "published"]) and len(s["object"]["content"]) ] + return ss, ss[-1].id class MastodonStatusProvider(StatusProvider): @@ -94,36 +103,41 @@ class MastodonStatusProvider(StatusProvider): self.user = user self.userid = 0 - def load_statuses(self) -> list[Status]: + def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: url = api_url_m_lookup_template.format(server=self.server) url += "?" + urllib.parse.urlencode({"acct": self.user}) res = requests.get(url) if res.status_code == 404: - return self._fallback_not_found() + return self._fallback_not_found(), None try: res.raise_for_status() except requests.exceptions.RequestException as e: logging.error("Request error: %s", e) - return self._fallback_error(getattr(e, "message", str(e))) + return self._fallback_error(getattr(e, "message", str(e))), None user = res.json() self.userid = user.get("id", None) if not self.userid: - return self._fallback_error("Malformed content in querying user ID.") + return self._fallback_error("Malformed content in querying user ID."), None url = api_url_m_status_template.format( server=self.server, uid=urllib.parse.quote(self.userid) ) + + if max_id: + url += "?" + urllib.parse.urlencode({"max_id": max_id}) + logger.debug("Get Masto status from %s", url) + res = requests.get(url) if res.status_code == 404: - return self._fallback_not_found() + return self._fallback_not_found(), None try: res.raise_for_status() except requests.exceptions.RequestException as e: logging.error("Request error: %s", e) - return self._fallback_error(getattr(e, "message", str(e))) + return self._fallback_error(getattr(e, "message", str(e))), None statuses = res.json() - return [ + ss = [ Status( s["id"], s["content"], @@ -133,6 +147,7 @@ class MastodonStatusProvider(StatusProvider): if all(key in s for key in ["id", "content", "created_at"]) and len(s["content"]) ] + return ss, ss[-1].id class StatusFileSystem(Operations, LoggingMixIn): @@ -227,20 +242,23 @@ def parse_arguments(): return args + # todo: make this a loop supporting paging -def status_fetcher(fs: StatusFileSystem, sp: StatusProvider): - logger.debug("Waiting to fetch statuses.") - time.sleep(5) - logger.debug("Fetch statuses.") - st = sp.load_statuses() - logger.debug("Add statuses to FS.") - fs.add_statuses(st) - logger.debug("Wait...") - time.sleep(5) +def status_fetcher(fs: StatusFileSystem, sp: StatusProvider, sig_quit: Event): + max_id = "" + while (max_id is not None) and not (sig_quit.wait(5)): + logger.debug("Fetch statuses.") + st_rep = sp.load_statuses(max_id) + logger.debug("Add statuses to FS.") + fs.add_statuses(st_rep[0]) + max_id = st_rep[1] + logger.debug("Waiting to fetch statuses.") logger.debug("Done.") def main(args): + quit_evt = Event() + t = None try: if args.api_choice == APIChoice.MASTODON: status_provider = MastodonStatusProvider(args.server, args.username) @@ -249,7 +267,7 @@ def main(args): myfs = StatusFileSystem() - t = Thread(target=status_fetcher, args=(myfs, status_provider)) + t = Thread(target=status_fetcher, args=(myfs, status_provider, quit_evt)) t.start() f = FUSE(myfs, args.mountpoint, nothreads=True, foreground=True) @@ -257,8 +275,9 @@ def main(args): fuse_exit() raise finally: - # q.join() - t.join() + quit_evt.set() + if t: + t.join() if __name__ == "__main__": -- cgit v1.2.3 From 369555b0b6be2a7679af2b52c81558596e7141ec Mon Sep 17 00:00:00 2001 From: uvok Date: Tue, 21 Jan 2025 20:34:51 +0100 Subject: Fix: Need to consider 0-byte post for max_id otherwise a page full of reposts will result in an infinite loop. --- hello-fusepy.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/hello-fusepy.py b/hello-fusepy.py index 1db302b..175c9a3 100644 --- a/hello-fusepy.py +++ b/hello-fusepy.py @@ -82,6 +82,7 @@ class ActivityPubStatusProvider(StatusProvider): None, ) + # consider reposts for getting max_id... ss = [ Status( s["object"]["id"].split("/")[-1], @@ -92,9 +93,9 @@ class ActivityPubStatusProvider(StatusProvider): if s.get("type", None) == "Create" and "object" in s and all(key in s["object"] for key in ["id", "content", "published"]) - and len(s["object"]["content"]) ] - return ss, ss[-1].id + # ... but don't return it + return [s for s in ss if len(s.content)], ss[-1].id class MastodonStatusProvider(StatusProvider): @@ -137,6 +138,7 @@ class MastodonStatusProvider(StatusProvider): logging.error("Request error: %s", e) return self._fallback_error(getattr(e, "message", str(e))), None statuses = res.json() + # consider reposts for getting max_id... ss = [ Status( s["id"], @@ -145,9 +147,9 @@ class MastodonStatusProvider(StatusProvider): ) for s in statuses if all(key in s for key in ["id", "content", "created_at"]) - and len(s["content"]) ] - return ss, ss[-1].id + # ... but don't return it + return [s for s in ss if len(s.content)], ss[-1].id class StatusFileSystem(Operations, LoggingMixIn): -- cgit v1.2.3 From d49fb0eca080111ec2571de33a95a290ba062bd9 Mon Sep 17 00:00:00 2001 From: uvok Date: Wed, 22 Jan 2025 20:05:16 +0100 Subject: Add gitignore --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..da4129b --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/venv/ +*.pyc +/.vscode/ -- cgit v1.2.3 From 0a4b9f6c43cddcfa4355f58cc6c5fb91c216e8d4 Mon Sep 17 00:00:00 2001 From: uvok Date: Wed, 22 Jan 2025 20:05:44 +0100 Subject: Split code into modules --- de_uvok/activitypub_fuse/__init__.py | 0 de_uvok/activitypub_fuse/status.py | 5 + de_uvok/activitypub_fuse/status_fuse.py | 76 ++++++++++ de_uvok/activitypub_fuse/status_provider.py | 122 ++++++++++++++++ hello-fusepy.py | 206 ++-------------------------- 5 files changed, 214 insertions(+), 195 deletions(-) create mode 100644 de_uvok/activitypub_fuse/__init__.py create mode 100644 de_uvok/activitypub_fuse/status.py create mode 100644 de_uvok/activitypub_fuse/status_fuse.py create mode 100644 de_uvok/activitypub_fuse/status_provider.py diff --git a/de_uvok/activitypub_fuse/__init__.py b/de_uvok/activitypub_fuse/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/de_uvok/activitypub_fuse/status.py b/de_uvok/activitypub_fuse/status.py new file mode 100644 index 0000000..d354335 --- /dev/null +++ b/de_uvok/activitypub_fuse/status.py @@ -0,0 +1,5 @@ +class Status(object): + def __init__(self, id: str, content: str, published: str): + self.id = id + self.content = content + self.published = published diff --git a/de_uvok/activitypub_fuse/status_fuse.py b/de_uvok/activitypub_fuse/status_fuse.py new file mode 100644 index 0000000..00ee747 --- /dev/null +++ b/de_uvok/activitypub_fuse/status_fuse.py @@ -0,0 +1,76 @@ +from datetime import datetime as dtp +import errno +import stat +from threading import Lock +from fuse import ( + Operations, + FuseOSError, + LoggingMixIn, + fuse_get_context, +) + +from .status import Status + + +class StatusFileSystem(Operations, LoggingMixIn): + def __init__(self): + self._lock = Lock() + + with self._lock: + self.statuses: list[Status] = [] + + self.fd = 0 + + def getattr(self, path, fh=None): + (uid, gid, _) = fuse_get_context() + if path == "/": + return { + "st_mode": (stat.S_IFDIR | 0o700), # Directory + "st_nlink": 2, + "st_uid": uid, + "st_gid": gid, + } + with self._lock: + found = next((s for s in self.statuses if s.id == path[1:]), None) + + if found: + published_dt = dtp.fromisoformat(found.published) + pubunix = published_dt.timestamp() + return { + "st_mode": (stat.S_IFREG | 0o400), + "st_size": len(found.content.encode("utf8")), + "st_nlink": 1, + "st_uid": uid, + "st_gid": gid, + "st_ctime": pubunix, + "st_mtime": pubunix, + } + raise FuseOSError(errno.ENOENT) + + def list_dir(self) -> list[str]: + with self._lock: + return [s.id for s in self.statuses] + + def readdir(self, path, fh): + dir_entries = [] + if path != "/": + raise FuseOSError(errno.ENOENT) + dir_entries = [".", ".."] + dir_entries += self.list_dir() + return dir_entries + + def add_statuses(self, statuses: list[Status]): + with self._lock: + self.statuses.extend(statuses) + + def open(self, path, flags): # type: ignore + self.fd += 1 + return self.fd + + def read(self, path, size, offset, fh): # type: ignore + with self._lock: + found = next(s for s in self.statuses if s.id == path[1:]) + + if found: + return found.content.encode("utf8") + raise FuseOSError(errno.ENOENT) diff --git a/de_uvok/activitypub_fuse/status_provider.py b/de_uvok/activitypub_fuse/status_provider.py new file mode 100644 index 0000000..b434ad5 --- /dev/null +++ b/de_uvok/activitypub_fuse/status_provider.py @@ -0,0 +1,122 @@ +from typing import Optional + +import logging +import requests +import urllib.parse + +from .status import Status + +logger = logging.Logger(__name__) + +_api_url_ap_template = "https://{server}/users/{user}/outbox?page=true" +_api_url_m_lookup_template = "https://{server}/api/v1/accounts/lookup" +_api_url_m_status_template = "https://{server}/api/v1/accounts/{uid}/statuses" + + +class StatusProvider: + def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: + raise NotImplementedError + + def _fallback_not_found(self): + return [Status("not-found", "User not found", "1970-01-01T00:00:00Z")] + + def _fallback_error(self, error_msg: str): + return [Status("error", error_msg, "1970-01-01T00:00:00Z")] + + +class ActivityPubStatusProvider(StatusProvider): + def __init__(self, server: str, user: str): + self.server = server + self.user = user + + def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: + url = _api_url_ap_template.format(server=self.server, user=self.user) + if max_id: + url += "&" + urllib.parse.urlencode({"max_id": max_id}) + logger.debug("Get AP status from %s", url) + res = requests.get(url) + if res.status_code == 404: + return self._fallback_not_found(), None + + try: + res.raise_for_status() + except requests.exceptions.RequestException as e: + logger.error("Request error: %s", e) + return self._fallback_error(getattr(e, "message", str(e))), None + + stats = res.json() + status_items = stats.get("orderedItems", None) + if not status_items: + return ( + self._fallback_error("Malformed content in querying AP outbox."), + None, + ) + + # consider reposts for getting max_id... + ss = [ + Status( + s["object"]["id"].split("/")[-1], + s["object"]["content"], + s["object"]["published"], + ) + for s in status_items + if s.get("type", None) == "Create" + and "object" in s + and all(key in s["object"] for key in ["id", "content", "published"]) + ] + # ... but don't return it + return [s for s in ss if len(s.content)], ss[-1].id + + +class MastodonStatusProvider(StatusProvider): + def __init__(self, server: str, user: str): + self.server = server + self.user = user + self.userid = 0 + + def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: + url = _api_url_m_lookup_template.format(server=self.server) + url += "?" + urllib.parse.urlencode({"acct": self.user}) + res = requests.get(url) + if res.status_code == 404: + return self._fallback_not_found(), None + try: + res.raise_for_status() + except requests.exceptions.RequestException as e: + logger.error("Request error: %s", e) + return self._fallback_error(getattr(e, "message", str(e))), None + + user = res.json() + self.userid = user.get("id", None) + if not self.userid: + return self._fallback_error("Malformed content in querying user ID."), None + + url = _api_url_m_status_template.format( + server=self.server, uid=urllib.parse.quote(self.userid) + ) + + if max_id: + url += "?" + urllib.parse.urlencode({"max_id": max_id}) + logger.debug("Get Masto status from %s", url) + + res = requests.get(url) + if res.status_code == 404: + return self._fallback_not_found(), None + try: + res.raise_for_status() + except requests.exceptions.RequestException as e: + logger.error("Request error: %s", e) + return self._fallback_error(getattr(e, "message", str(e))), None + statuses = res.json() + # consider reposts for getting max_id... + ss = [ + Status( + s["id"], + s["content"], + s["created_at"], + ) + for s in statuses + if all(key in s for key in ["id", "content", "created_at"]) + ] + # ... but don't return it + return [s for s in ss if len(s.content)], ss[-1].id diff --git a/hello-fusepy.py b/hello-fusepy.py index 175c9a3..a0a7d45 100644 --- a/hello-fusepy.py +++ b/hello-fusepy.py @@ -11,17 +11,18 @@ from typing import Optional from threading import Thread, Lock, Event from queue import Queue -from datetime import datetime as dtp -from fuse import ( - FUSE, - Operations, - FuseOSError, - LoggingMixIn, - fuse_exit, - fuse_get_context, -) + import urllib.parse +from fuse import FUSE, fuse_exit + +from de_uvok.activitypub_fuse.status_fuse import StatusFileSystem +from de_uvok.activitypub_fuse.status_provider import ( + ActivityPubStatusProvider, + MastodonStatusProvider, + StatusProvider, +) + logger = logging.getLogger(__name__) @@ -31,191 +32,6 @@ class APIChoice(enum.Enum): MASTODON = "Mastodon" -api_url_ap_template = "https://{server}/users/{user}/outbox?page=true" -api_url_m_lookup_template = "https://{server}/api/v1/accounts/lookup" -api_url_m_status_template = "https://{server}/api/v1/accounts/{uid}/statuses" - - -class Status(object): - def __init__(self, id: str, content: str, published: str): - self.id = id - self.content = content - self.published = published - - -class StatusProvider: - def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: - raise NotImplementedError - - def _fallback_not_found(self): - return [Status("not-found", "User not found", "1970-01-01T00:00:00Z")] - - def _fallback_error(self, error_msg: str): - return [Status("error", error_msg, "1970-01-01T00:00:00Z")] - - -class ActivityPubStatusProvider(StatusProvider): - def __init__(self, server: str, user: str): - self.server = server - self.user = user - - def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: - url = api_url_ap_template.format(server=self.server, user=self.user) - if max_id: - url += "&" + urllib.parse.urlencode({"max_id": max_id}) - logger.debug("Get AP status from %s", url) - res = requests.get(url) - if res.status_code == 404: - return self._fallback_not_found(), None - - try: - res.raise_for_status() - except requests.exceptions.RequestException as e: - logging.error("Request error: %s", e) - return self._fallback_error(getattr(e, "message", str(e))), None - - stats = res.json() - status_items = stats.get("orderedItems", None) - if not status_items: - return ( - self._fallback_error("Malformed content in querying AP outbox."), - None, - ) - - # consider reposts for getting max_id... - ss = [ - Status( - s["object"]["id"].split("/")[-1], - s["object"]["content"], - s["object"]["published"], - ) - for s in status_items - if s.get("type", None) == "Create" - and "object" in s - and all(key in s["object"] for key in ["id", "content", "published"]) - ] - # ... but don't return it - return [s for s in ss if len(s.content)], ss[-1].id - - -class MastodonStatusProvider(StatusProvider): - def __init__(self, server: str, user: str): - self.server = server - self.user = user - self.userid = 0 - - def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: - url = api_url_m_lookup_template.format(server=self.server) - url += "?" + urllib.parse.urlencode({"acct": self.user}) - res = requests.get(url) - if res.status_code == 404: - return self._fallback_not_found(), None - try: - res.raise_for_status() - except requests.exceptions.RequestException as e: - logging.error("Request error: %s", e) - return self._fallback_error(getattr(e, "message", str(e))), None - - user = res.json() - self.userid = user.get("id", None) - if not self.userid: - return self._fallback_error("Malformed content in querying user ID."), None - - url = api_url_m_status_template.format( - server=self.server, uid=urllib.parse.quote(self.userid) - ) - - if max_id: - url += "?" + urllib.parse.urlencode({"max_id": max_id}) - logger.debug("Get Masto status from %s", url) - - res = requests.get(url) - if res.status_code == 404: - return self._fallback_not_found(), None - try: - res.raise_for_status() - except requests.exceptions.RequestException as e: - logging.error("Request error: %s", e) - return self._fallback_error(getattr(e, "message", str(e))), None - statuses = res.json() - # consider reposts for getting max_id... - ss = [ - Status( - s["id"], - s["content"], - s["created_at"], - ) - for s in statuses - if all(key in s for key in ["id", "content", "created_at"]) - ] - # ... but don't return it - return [s for s in ss if len(s.content)], ss[-1].id - - -class StatusFileSystem(Operations, LoggingMixIn): - def __init__(self): - self._lock = Lock() - - with self._lock: - self.statuses: list[Status] = [] - - self.fd = 0 - - def getattr(self, path, fh=None): - (uid, gid, _) = fuse_get_context() - if path == "/": - return { - "st_mode": (stat.S_IFDIR | 0o700), # Directory - "st_nlink": 2, - "st_uid": uid, - "st_gid": gid, - } - with self._lock: - found = next((s for s in self.statuses if s.id == path[1:]), None) - - if found: - published_dt = dtp.fromisoformat(found.published) - pubunix = published_dt.timestamp() - return { - "st_mode": (stat.S_IFREG | 0o400), - "st_size": len(found.content.encode("utf8")), - "st_nlink": 1, - "st_uid": uid, - "st_gid": gid, - "st_ctime": pubunix, - "st_mtime": pubunix, - } - raise FuseOSError(errno.ENOENT) - - def list_dir(self) -> list[str]: - with self._lock: - return [s.id for s in self.statuses] - - def readdir(self, path, fh): - dir_entries = [] - if path != "/": - raise FuseOSError(errno.ENOENT) - dir_entries = [".", ".."] - dir_entries += self.list_dir() - return dir_entries - - def add_statuses(self, statuses: list[Status]): - with self._lock: - self.statuses.extend(statuses) - - def open(self, path, flags): - self.fd += 1 - return self.fd - - def read(self, path, size, offset, fh): - with self._lock: - found = next(s for s in self.statuses if s.id == path[1:]) - - if found: - return found.content.encode("utf8") - raise FuseOSError(errno.ENOENT) - - def parse_arguments(): parser = argparse.ArgumentParser( description="Mount a read-only FUSE filesystem for ActivityPub or Mastodon" @@ -272,7 +88,7 @@ def main(args): t = Thread(target=status_fetcher, args=(myfs, status_provider, quit_evt)) t.start() - f = FUSE(myfs, args.mountpoint, nothreads=True, foreground=True) + FUSE(myfs, args.mountpoint, nothreads=True, foreground=True) except: fuse_exit() raise -- cgit v1.2.3 From 976e5526ed7f95e0e5600b23daa88f82bcf8fe16 Mon Sep 17 00:00:00 2001 From: uvok Date: Wed, 22 Jan 2025 20:08:41 +0100 Subject: Rename --- de_uvok/activitypub_fuse/fuse.py | 76 +++++++++++++++++ de_uvok/activitypub_fuse/providers.py | 122 ++++++++++++++++++++++++++++ de_uvok/activitypub_fuse/status.py | 5 -- de_uvok/activitypub_fuse/status_fuse.py | 76 ----------------- de_uvok/activitypub_fuse/status_provider.py | 122 ---------------------------- de_uvok/activitypub_fuse/types.py | 5 ++ hello-fusepy.py | 4 +- 7 files changed, 205 insertions(+), 205 deletions(-) create mode 100644 de_uvok/activitypub_fuse/fuse.py create mode 100644 de_uvok/activitypub_fuse/providers.py delete mode 100644 de_uvok/activitypub_fuse/status.py delete mode 100644 de_uvok/activitypub_fuse/status_fuse.py delete mode 100644 de_uvok/activitypub_fuse/status_provider.py create mode 100644 de_uvok/activitypub_fuse/types.py diff --git a/de_uvok/activitypub_fuse/fuse.py b/de_uvok/activitypub_fuse/fuse.py new file mode 100644 index 0000000..8be5351 --- /dev/null +++ b/de_uvok/activitypub_fuse/fuse.py @@ -0,0 +1,76 @@ +from datetime import datetime as dtp +import errno +import stat +from threading import Lock +from fuse import ( + Operations, + FuseOSError, + LoggingMixIn, + fuse_get_context, +) + +from .types import Status + + +class StatusFileSystem(Operations, LoggingMixIn): + def __init__(self): + self._lock = Lock() + + with self._lock: + self.statuses: list[Status] = [] + + self.fd = 0 + + def getattr(self, path, fh=None): + (uid, gid, _) = fuse_get_context() + if path == "/": + return { + "st_mode": (stat.S_IFDIR | 0o700), # Directory + "st_nlink": 2, + "st_uid": uid, + "st_gid": gid, + } + with self._lock: + found = next((s for s in self.statuses if s.id == path[1:]), None) + + if found: + published_dt = dtp.fromisoformat(found.published) + pubunix = published_dt.timestamp() + return { + "st_mode": (stat.S_IFREG | 0o400), + "st_size": len(found.content.encode("utf8")), + "st_nlink": 1, + "st_uid": uid, + "st_gid": gid, + "st_ctime": pubunix, + "st_mtime": pubunix, + } + raise FuseOSError(errno.ENOENT) + + def list_dir(self) -> list[str]: + with self._lock: + return [s.id for s in self.statuses] + + def readdir(self, path, fh): + dir_entries = [] + if path != "/": + raise FuseOSError(errno.ENOENT) + dir_entries = [".", ".."] + dir_entries += self.list_dir() + return dir_entries + + def add_statuses(self, statuses: list[Status]): + with self._lock: + self.statuses.extend(statuses) + + def open(self, path, flags): # type: ignore + self.fd += 1 + return self.fd + + def read(self, path, size, offset, fh): # type: ignore + with self._lock: + found = next(s for s in self.statuses if s.id == path[1:]) + + if found: + return found.content.encode("utf8") + raise FuseOSError(errno.ENOENT) diff --git a/de_uvok/activitypub_fuse/providers.py b/de_uvok/activitypub_fuse/providers.py new file mode 100644 index 0000000..19cee62 --- /dev/null +++ b/de_uvok/activitypub_fuse/providers.py @@ -0,0 +1,122 @@ +from typing import Optional + +import logging +import requests +import urllib.parse + +from .types import Status + +logger = logging.Logger(__name__) + +_api_url_ap_template = "https://{server}/users/{user}/outbox?page=true" +_api_url_m_lookup_template = "https://{server}/api/v1/accounts/lookup" +_api_url_m_status_template = "https://{server}/api/v1/accounts/{uid}/statuses" + + +class StatusProvider: + def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: + raise NotImplementedError + + def _fallback_not_found(self): + return [Status("not-found", "User not found", "1970-01-01T00:00:00Z")] + + def _fallback_error(self, error_msg: str): + return [Status("error", error_msg, "1970-01-01T00:00:00Z")] + + +class ActivityPubStatusProvider(StatusProvider): + def __init__(self, server: str, user: str): + self.server = server + self.user = user + + def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: + url = _api_url_ap_template.format(server=self.server, user=self.user) + if max_id: + url += "&" + urllib.parse.urlencode({"max_id": max_id}) + logger.debug("Get AP status from %s", url) + res = requests.get(url) + if res.status_code == 404: + return self._fallback_not_found(), None + + try: + res.raise_for_status() + except requests.exceptions.RequestException as e: + logger.error("Request error: %s", e) + return self._fallback_error(getattr(e, "message", str(e))), None + + stats = res.json() + status_items = stats.get("orderedItems", None) + if not status_items: + return ( + self._fallback_error("Malformed content in querying AP outbox."), + None, + ) + + # consider reposts for getting max_id... + ss = [ + Status( + s["object"]["id"].split("/")[-1], + s["object"]["content"], + s["object"]["published"], + ) + for s in status_items + if s.get("type", None) == "Create" + and "object" in s + and all(key in s["object"] for key in ["id", "content", "published"]) + ] + # ... but don't return it + return [s for s in ss if len(s.content)], ss[-1].id + + +class MastodonStatusProvider(StatusProvider): + def __init__(self, server: str, user: str): + self.server = server + self.user = user + self.userid = 0 + + def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: + url = _api_url_m_lookup_template.format(server=self.server) + url += "?" + urllib.parse.urlencode({"acct": self.user}) + res = requests.get(url) + if res.status_code == 404: + return self._fallback_not_found(), None + try: + res.raise_for_status() + except requests.exceptions.RequestException as e: + logger.error("Request error: %s", e) + return self._fallback_error(getattr(e, "message", str(e))), None + + user = res.json() + self.userid = user.get("id", None) + if not self.userid: + return self._fallback_error("Malformed content in querying user ID."), None + + url = _api_url_m_status_template.format( + server=self.server, uid=urllib.parse.quote(self.userid) + ) + + if max_id: + url += "?" + urllib.parse.urlencode({"max_id": max_id}) + logger.debug("Get Masto status from %s", url) + + res = requests.get(url) + if res.status_code == 404: + return self._fallback_not_found(), None + try: + res.raise_for_status() + except requests.exceptions.RequestException as e: + logger.error("Request error: %s", e) + return self._fallback_error(getattr(e, "message", str(e))), None + statuses = res.json() + # consider reposts for getting max_id... + ss = [ + Status( + s["id"], + s["content"], + s["created_at"], + ) + for s in statuses + if all(key in s for key in ["id", "content", "created_at"]) + ] + # ... but don't return it + return [s for s in ss if len(s.content)], ss[-1].id diff --git a/de_uvok/activitypub_fuse/status.py b/de_uvok/activitypub_fuse/status.py deleted file mode 100644 index d354335..0000000 --- a/de_uvok/activitypub_fuse/status.py +++ /dev/null @@ -1,5 +0,0 @@ -class Status(object): - def __init__(self, id: str, content: str, published: str): - self.id = id - self.content = content - self.published = published diff --git a/de_uvok/activitypub_fuse/status_fuse.py b/de_uvok/activitypub_fuse/status_fuse.py deleted file mode 100644 index 00ee747..0000000 --- a/de_uvok/activitypub_fuse/status_fuse.py +++ /dev/null @@ -1,76 +0,0 @@ -from datetime import datetime as dtp -import errno -import stat -from threading import Lock -from fuse import ( - Operations, - FuseOSError, - LoggingMixIn, - fuse_get_context, -) - -from .status import Status - - -class StatusFileSystem(Operations, LoggingMixIn): - def __init__(self): - self._lock = Lock() - - with self._lock: - self.statuses: list[Status] = [] - - self.fd = 0 - - def getattr(self, path, fh=None): - (uid, gid, _) = fuse_get_context() - if path == "/": - return { - "st_mode": (stat.S_IFDIR | 0o700), # Directory - "st_nlink": 2, - "st_uid": uid, - "st_gid": gid, - } - with self._lock: - found = next((s for s in self.statuses if s.id == path[1:]), None) - - if found: - published_dt = dtp.fromisoformat(found.published) - pubunix = published_dt.timestamp() - return { - "st_mode": (stat.S_IFREG | 0o400), - "st_size": len(found.content.encode("utf8")), - "st_nlink": 1, - "st_uid": uid, - "st_gid": gid, - "st_ctime": pubunix, - "st_mtime": pubunix, - } - raise FuseOSError(errno.ENOENT) - - def list_dir(self) -> list[str]: - with self._lock: - return [s.id for s in self.statuses] - - def readdir(self, path, fh): - dir_entries = [] - if path != "/": - raise FuseOSError(errno.ENOENT) - dir_entries = [".", ".."] - dir_entries += self.list_dir() - return dir_entries - - def add_statuses(self, statuses: list[Status]): - with self._lock: - self.statuses.extend(statuses) - - def open(self, path, flags): # type: ignore - self.fd += 1 - return self.fd - - def read(self, path, size, offset, fh): # type: ignore - with self._lock: - found = next(s for s in self.statuses if s.id == path[1:]) - - if found: - return found.content.encode("utf8") - raise FuseOSError(errno.ENOENT) diff --git a/de_uvok/activitypub_fuse/status_provider.py b/de_uvok/activitypub_fuse/status_provider.py deleted file mode 100644 index b434ad5..0000000 --- a/de_uvok/activitypub_fuse/status_provider.py +++ /dev/null @@ -1,122 +0,0 @@ -from typing import Optional - -import logging -import requests -import urllib.parse - -from .status import Status - -logger = logging.Logger(__name__) - -_api_url_ap_template = "https://{server}/users/{user}/outbox?page=true" -_api_url_m_lookup_template = "https://{server}/api/v1/accounts/lookup" -_api_url_m_status_template = "https://{server}/api/v1/accounts/{uid}/statuses" - - -class StatusProvider: - def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: - raise NotImplementedError - - def _fallback_not_found(self): - return [Status("not-found", "User not found", "1970-01-01T00:00:00Z")] - - def _fallback_error(self, error_msg: str): - return [Status("error", error_msg, "1970-01-01T00:00:00Z")] - - -class ActivityPubStatusProvider(StatusProvider): - def __init__(self, server: str, user: str): - self.server = server - self.user = user - - def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: - url = _api_url_ap_template.format(server=self.server, user=self.user) - if max_id: - url += "&" + urllib.parse.urlencode({"max_id": max_id}) - logger.debug("Get AP status from %s", url) - res = requests.get(url) - if res.status_code == 404: - return self._fallback_not_found(), None - - try: - res.raise_for_status() - except requests.exceptions.RequestException as e: - logger.error("Request error: %s", e) - return self._fallback_error(getattr(e, "message", str(e))), None - - stats = res.json() - status_items = stats.get("orderedItems", None) - if not status_items: - return ( - self._fallback_error("Malformed content in querying AP outbox."), - None, - ) - - # consider reposts for getting max_id... - ss = [ - Status( - s["object"]["id"].split("/")[-1], - s["object"]["content"], - s["object"]["published"], - ) - for s in status_items - if s.get("type", None) == "Create" - and "object" in s - and all(key in s["object"] for key in ["id", "content", "published"]) - ] - # ... but don't return it - return [s for s in ss if len(s.content)], ss[-1].id - - -class MastodonStatusProvider(StatusProvider): - def __init__(self, server: str, user: str): - self.server = server - self.user = user - self.userid = 0 - - def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: - url = _api_url_m_lookup_template.format(server=self.server) - url += "?" + urllib.parse.urlencode({"acct": self.user}) - res = requests.get(url) - if res.status_code == 404: - return self._fallback_not_found(), None - try: - res.raise_for_status() - except requests.exceptions.RequestException as e: - logger.error("Request error: %s", e) - return self._fallback_error(getattr(e, "message", str(e))), None - - user = res.json() - self.userid = user.get("id", None) - if not self.userid: - return self._fallback_error("Malformed content in querying user ID."), None - - url = _api_url_m_status_template.format( - server=self.server, uid=urllib.parse.quote(self.userid) - ) - - if max_id: - url += "?" + urllib.parse.urlencode({"max_id": max_id}) - logger.debug("Get Masto status from %s", url) - - res = requests.get(url) - if res.status_code == 404: - return self._fallback_not_found(), None - try: - res.raise_for_status() - except requests.exceptions.RequestException as e: - logger.error("Request error: %s", e) - return self._fallback_error(getattr(e, "message", str(e))), None - statuses = res.json() - # consider reposts for getting max_id... - ss = [ - Status( - s["id"], - s["content"], - s["created_at"], - ) - for s in statuses - if all(key in s for key in ["id", "content", "created_at"]) - ] - # ... but don't return it - return [s for s in ss if len(s.content)], ss[-1].id diff --git a/de_uvok/activitypub_fuse/types.py b/de_uvok/activitypub_fuse/types.py new file mode 100644 index 0000000..d354335 --- /dev/null +++ b/de_uvok/activitypub_fuse/types.py @@ -0,0 +1,5 @@ +class Status(object): + def __init__(self, id: str, content: str, published: str): + self.id = id + self.content = content + self.published = published diff --git a/hello-fusepy.py b/hello-fusepy.py index a0a7d45..f3fd478 100644 --- a/hello-fusepy.py +++ b/hello-fusepy.py @@ -16,8 +16,8 @@ import urllib.parse from fuse import FUSE, fuse_exit -from de_uvok.activitypub_fuse.status_fuse import StatusFileSystem -from de_uvok.activitypub_fuse.status_provider import ( +from de_uvok.activitypub_fuse.fuse import StatusFileSystem +from de_uvok.activitypub_fuse.providers import ( ActivityPubStatusProvider, MastodonStatusProvider, StatusProvider, -- cgit v1.2.3 From 29ad7bd09839ec958ee40a20442f2ab337da86e1 Mon Sep 17 00:00:00 2001 From: uvok Date: Thu, 23 Jan 2025 19:53:38 +0100 Subject: Rename script --- fuse-ap.py | 105 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ hello-fusepy.py | 105 -------------------------------------------------------- 2 files changed, 105 insertions(+), 105 deletions(-) create mode 100644 fuse-ap.py delete mode 100644 hello-fusepy.py diff --git a/fuse-ap.py b/fuse-ap.py new file mode 100644 index 0000000..f3fd478 --- /dev/null +++ b/fuse-ap.py @@ -0,0 +1,105 @@ +import argparse +import enum +import errno +import logging +import time +import requests +import sys +import stat + +from typing import Optional +from threading import Thread, Lock, Event +from queue import Queue + + +import urllib.parse + +from fuse import FUSE, fuse_exit + +from de_uvok.activitypub_fuse.fuse import StatusFileSystem +from de_uvok.activitypub_fuse.providers import ( + ActivityPubStatusProvider, + MastodonStatusProvider, + StatusProvider, +) + + +logger = logging.getLogger(__name__) + + +class APIChoice(enum.Enum): + ACTIVITYPUB = "ActivityPub" + MASTODON = "Mastodon" + + +def parse_arguments(): + parser = argparse.ArgumentParser( + description="Mount a read-only FUSE filesystem for ActivityPub or Mastodon" + ) + parser.add_argument( + "mountpoint", help="The directory where the filesystem will be mounted" + ) + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( + "-a", "--activitypub", action="store_true", help="Use ActivityPub API" + ) + group.add_argument("-m", "--mastodon", action="store_true", help="Use Mastodon API") + parser.add_argument("-s", "--server", required=True, help="The server/host URL") + parser.add_argument( + "-u", "--username", required=True, help="The username to fetch statuses for" + ) + + args = parser.parse_args() + + if args.activitypub: + args.api_choice = APIChoice.ACTIVITYPUB + elif args.mastodon: + args.api_choice = APIChoice.MASTODON + else: + parser.error("Must choose either ActivityPub or Mastodon API") + + return args + + +# todo: make this a loop supporting paging +def status_fetcher(fs: StatusFileSystem, sp: StatusProvider, sig_quit: Event): + max_id = "" + while (max_id is not None) and not (sig_quit.wait(5)): + logger.debug("Fetch statuses.") + st_rep = sp.load_statuses(max_id) + logger.debug("Add statuses to FS.") + fs.add_statuses(st_rep[0]) + max_id = st_rep[1] + logger.debug("Waiting to fetch statuses.") + logger.debug("Done.") + + +def main(args): + quit_evt = Event() + t = None + try: + if args.api_choice == APIChoice.MASTODON: + status_provider = MastodonStatusProvider(args.server, args.username) + else: + status_provider = ActivityPubStatusProvider(args.server, args.username) + + myfs = StatusFileSystem() + + t = Thread(target=status_fetcher, args=(myfs, status_provider, quit_evt)) + t.start() + + FUSE(myfs, args.mountpoint, nothreads=True, foreground=True) + except: + fuse_exit() + raise + finally: + quit_evt.set() + if t: + t.join() + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + logger.setLevel(logging.DEBUG) + args = parse_arguments() + main(args) diff --git a/hello-fusepy.py b/hello-fusepy.py deleted file mode 100644 index f3fd478..0000000 --- a/hello-fusepy.py +++ /dev/null @@ -1,105 +0,0 @@ -import argparse -import enum -import errno -import logging -import time -import requests -import sys -import stat - -from typing import Optional -from threading import Thread, Lock, Event -from queue import Queue - - -import urllib.parse - -from fuse import FUSE, fuse_exit - -from de_uvok.activitypub_fuse.fuse import StatusFileSystem -from de_uvok.activitypub_fuse.providers import ( - ActivityPubStatusProvider, - MastodonStatusProvider, - StatusProvider, -) - - -logger = logging.getLogger(__name__) - - -class APIChoice(enum.Enum): - ACTIVITYPUB = "ActivityPub" - MASTODON = "Mastodon" - - -def parse_arguments(): - parser = argparse.ArgumentParser( - description="Mount a read-only FUSE filesystem for ActivityPub or Mastodon" - ) - parser.add_argument( - "mountpoint", help="The directory where the filesystem will be mounted" - ) - group = parser.add_mutually_exclusive_group(required=True) - group.add_argument( - "-a", "--activitypub", action="store_true", help="Use ActivityPub API" - ) - group.add_argument("-m", "--mastodon", action="store_true", help="Use Mastodon API") - parser.add_argument("-s", "--server", required=True, help="The server/host URL") - parser.add_argument( - "-u", "--username", required=True, help="The username to fetch statuses for" - ) - - args = parser.parse_args() - - if args.activitypub: - args.api_choice = APIChoice.ACTIVITYPUB - elif args.mastodon: - args.api_choice = APIChoice.MASTODON - else: - parser.error("Must choose either ActivityPub or Mastodon API") - - return args - - -# todo: make this a loop supporting paging -def status_fetcher(fs: StatusFileSystem, sp: StatusProvider, sig_quit: Event): - max_id = "" - while (max_id is not None) and not (sig_quit.wait(5)): - logger.debug("Fetch statuses.") - st_rep = sp.load_statuses(max_id) - logger.debug("Add statuses to FS.") - fs.add_statuses(st_rep[0]) - max_id = st_rep[1] - logger.debug("Waiting to fetch statuses.") - logger.debug("Done.") - - -def main(args): - quit_evt = Event() - t = None - try: - if args.api_choice == APIChoice.MASTODON: - status_provider = MastodonStatusProvider(args.server, args.username) - else: - status_provider = ActivityPubStatusProvider(args.server, args.username) - - myfs = StatusFileSystem() - - t = Thread(target=status_fetcher, args=(myfs, status_provider, quit_evt)) - t.start() - - FUSE(myfs, args.mountpoint, nothreads=True, foreground=True) - except: - fuse_exit() - raise - finally: - quit_evt.set() - if t: - t.join() - - -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - logger.setLevel(logging.DEBUG) - args = parse_arguments() - main(args) -- cgit v1.2.3