diff options
author | uvok | 2025-01-22 20:05:44 +0100 |
---|---|---|
committer | uvok | 2025-01-22 20:05:44 +0100 |
commit | 0a4b9f6c43cddcfa4355f58cc6c5fb91c216e8d4 (patch) | |
tree | f48060725095e3e44b2d3332b81321ddacf50a6a /hello-fusepy.py | |
parent | d49fb0eca080111ec2571de33a95a290ba062bd9 (diff) |
Split code into modules
Diffstat (limited to 'hello-fusepy.py')
-rw-r--r-- | hello-fusepy.py | 206 |
1 files changed, 11 insertions, 195 deletions
diff --git a/hello-fusepy.py b/hello-fusepy.py index 175c9a3..a0a7d45 100644 --- a/hello-fusepy.py +++ b/hello-fusepy.py @@ -11,17 +11,18 @@ from typing import Optional from threading import Thread, Lock, Event from queue import Queue -from datetime import datetime as dtp -from fuse import ( - FUSE, - Operations, - FuseOSError, - LoggingMixIn, - fuse_exit, - fuse_get_context, -) + import urllib.parse +from fuse import FUSE, fuse_exit + +from de_uvok.activitypub_fuse.status_fuse import StatusFileSystem +from de_uvok.activitypub_fuse.status_provider import ( + ActivityPubStatusProvider, + MastodonStatusProvider, + StatusProvider, +) + logger = logging.getLogger(__name__) @@ -31,191 +32,6 @@ class APIChoice(enum.Enum): MASTODON = "Mastodon" -api_url_ap_template = "https://{server}/users/{user}/outbox?page=true" -api_url_m_lookup_template = "https://{server}/api/v1/accounts/lookup" -api_url_m_status_template = "https://{server}/api/v1/accounts/{uid}/statuses" - - -class Status(object): - def __init__(self, id: str, content: str, published: str): - self.id = id - self.content = content - self.published = published - - -class StatusProvider: - def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: - raise NotImplementedError - - def _fallback_not_found(self): - return [Status("not-found", "User not found", "1970-01-01T00:00:00Z")] - - def _fallback_error(self, error_msg: str): - return [Status("error", error_msg, "1970-01-01T00:00:00Z")] - - -class ActivityPubStatusProvider(StatusProvider): - def __init__(self, server: str, user: str): - self.server = server - self.user = user - - def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: - url = api_url_ap_template.format(server=self.server, user=self.user) - if max_id: - url += "&" + urllib.parse.urlencode({"max_id": max_id}) - logger.debug("Get AP status from %s", url) - res = requests.get(url) - if res.status_code == 404: - return self._fallback_not_found(), None - - try: - res.raise_for_status() - except requests.exceptions.RequestException as e: - logging.error("Request error: %s", e) - return self._fallback_error(getattr(e, "message", str(e))), None - - stats = res.json() - status_items = stats.get("orderedItems", None) - if not status_items: - return ( - self._fallback_error("Malformed content in querying AP outbox."), - None, - ) - - # consider reposts for getting max_id... - ss = [ - Status( - s["object"]["id"].split("/")[-1], - s["object"]["content"], - s["object"]["published"], - ) - for s in status_items - if s.get("type", None) == "Create" - and "object" in s - and all(key in s["object"] for key in ["id", "content", "published"]) - ] - # ... but don't return it - return [s for s in ss if len(s.content)], ss[-1].id - - -class MastodonStatusProvider(StatusProvider): - def __init__(self, server: str, user: str): - self.server = server - self.user = user - self.userid = 0 - - def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: - url = api_url_m_lookup_template.format(server=self.server) - url += "?" + urllib.parse.urlencode({"acct": self.user}) - res = requests.get(url) - if res.status_code == 404: - return self._fallback_not_found(), None - try: - res.raise_for_status() - except requests.exceptions.RequestException as e: - logging.error("Request error: %s", e) - return self._fallback_error(getattr(e, "message", str(e))), None - - user = res.json() - self.userid = user.get("id", None) - if not self.userid: - return self._fallback_error("Malformed content in querying user ID."), None - - url = api_url_m_status_template.format( - server=self.server, uid=urllib.parse.quote(self.userid) - ) - - if max_id: - url += "?" + urllib.parse.urlencode({"max_id": max_id}) - logger.debug("Get Masto status from %s", url) - - res = requests.get(url) - if res.status_code == 404: - return self._fallback_not_found(), None - try: - res.raise_for_status() - except requests.exceptions.RequestException as e: - logging.error("Request error: %s", e) - return self._fallback_error(getattr(e, "message", str(e))), None - statuses = res.json() - # consider reposts for getting max_id... - ss = [ - Status( - s["id"], - s["content"], - s["created_at"], - ) - for s in statuses - if all(key in s for key in ["id", "content", "created_at"]) - ] - # ... but don't return it - return [s for s in ss if len(s.content)], ss[-1].id - - -class StatusFileSystem(Operations, LoggingMixIn): - def __init__(self): - self._lock = Lock() - - with self._lock: - self.statuses: list[Status] = [] - - self.fd = 0 - - def getattr(self, path, fh=None): - (uid, gid, _) = fuse_get_context() - if path == "/": - return { - "st_mode": (stat.S_IFDIR | 0o700), # Directory - "st_nlink": 2, - "st_uid": uid, - "st_gid": gid, - } - with self._lock: - found = next((s for s in self.statuses if s.id == path[1:]), None) - - if found: - published_dt = dtp.fromisoformat(found.published) - pubunix = published_dt.timestamp() - return { - "st_mode": (stat.S_IFREG | 0o400), - "st_size": len(found.content.encode("utf8")), - "st_nlink": 1, - "st_uid": uid, - "st_gid": gid, - "st_ctime": pubunix, - "st_mtime": pubunix, - } - raise FuseOSError(errno.ENOENT) - - def list_dir(self) -> list[str]: - with self._lock: - return [s.id for s in self.statuses] - - def readdir(self, path, fh): - dir_entries = [] - if path != "/": - raise FuseOSError(errno.ENOENT) - dir_entries = [".", ".."] - dir_entries += self.list_dir() - return dir_entries - - def add_statuses(self, statuses: list[Status]): - with self._lock: - self.statuses.extend(statuses) - - def open(self, path, flags): - self.fd += 1 - return self.fd - - def read(self, path, size, offset, fh): - with self._lock: - found = next(s for s in self.statuses if s.id == path[1:]) - - if found: - return found.content.encode("utf8") - raise FuseOSError(errno.ENOENT) - - def parse_arguments(): parser = argparse.ArgumentParser( description="Mount a read-only FUSE filesystem for ActivityPub or Mastodon" @@ -272,7 +88,7 @@ def main(args): t = Thread(target=status_fetcher, args=(myfs, status_provider, quit_evt)) t.start() - f = FUSE(myfs, args.mountpoint, nothreads=True, foreground=True) + FUSE(myfs, args.mountpoint, nothreads=True, foreground=True) except: fuse_exit() raise |