import argparse import enum import errno import logging import time import requests import sys import stat from threading import Thread, Lock from queue import Queue from datetime import datetime as dtp from fuse import ( FUSE, Operations, FuseOSError, LoggingMixIn, fuse_exit, fuse_get_context, ) import urllib.parse logger = logging.getLogger(__name__) class APIChoice(enum.Enum): ACTIVITYPUB = "ActivityPub" MASTODON = "Mastodon" api_url_ap_template = "https://{server}/users/{user}/outbox?page=true" api_url_m_lookup_template = "https://{server}/api/v1/accounts/lookup" api_url_m_status_template = "https://{server}/api/v1/accounts/{uid}/statuses" class Status(object): def __init__(self, id: str, content: str, published: str): self.id = id self.content = content self.published = published class StatusProvider: def load_statuses(self) -> list[Status]: raise NotImplementedError def _fallback_not_found(self): return [Status("not-found", "User not found", "1970-01-01T00:00:00Z")] def _fallback_error(self, error_msg: str): return [Status("error", error_msg, "1970-01-01T00:00:00Z")] class ActivityPubStatusProvider(StatusProvider): def __init__(self, server: str, user: str): self.server = server self.user = user def load_statuses(self) -> list[Status]: url = api_url_ap_template.format(server=self.server, user=self.user) res = requests.get(url) if res.status_code == 404: return self._fallback_not_found() try: res.raise_for_status() except requests.exceptions.RequestException as e: logging.error("Request error: %s", e) return self._fallback_error(getattr(e, "message", str(e))) stats = res.json() status_items = stats.get("orderedItems", None) if not status_items: return self._fallback_error("Malformed content in querying AP outbox.") return [ Status( s["object"]["id"].split("/")[-1], s["object"]["content"], s["object"]["published"], ) for s in status_items if s.get("type", None) == "Create" and "object" in s and all(key in s["object"] for key in ["id", "content", "published"]) and len(s["object"]["content"]) ] class MastodonStatusProvider(StatusProvider): def __init__(self, server: str, user: str): self.server = server self.user = user self.userid = 0 def load_statuses(self) -> list[Status]: url = api_url_m_lookup_template.format(server=self.server) url += "?" + urllib.parse.urlencode({"acct": self.user}) res = requests.get(url) if res.status_code == 404: return self._fallback_not_found() try: res.raise_for_status() except requests.exceptions.RequestException as e: logging.error("Request error: %s", e) return self._fallback_error(getattr(e, "message", str(e))) user = res.json() self.userid = user.get("id", None) if not self.userid: return self._fallback_error("Malformed content in querying user ID.") url = api_url_m_status_template.format( server=self.server, uid=urllib.parse.quote(self.userid) ) res = requests.get(url) if res.status_code == 404: return self._fallback_not_found() try: res.raise_for_status() except requests.exceptions.RequestException as e: logging.error("Request error: %s", e) return self._fallback_error(getattr(e, "message", str(e))) statuses = res.json() return [ Status( s["id"], s["content"], s["created_at"], ) for s in statuses if all(key in s for key in ["id", "content", "created_at"]) and len(s["content"]) ] class StatusFileSystem(Operations, LoggingMixIn): def __init__(self): self._lock = Lock() with self._lock: self.statuses: list[Status] = [] self.fd = 0 def getattr(self, path, fh=None): (uid, gid, _) = fuse_get_context() if path == "/": return { "st_mode": (stat.S_IFDIR | 0o700), # Directory "st_nlink": 2, "st_uid": uid, "st_gid": gid, } with self._lock: found = next((s for s in self.statuses if s.id == path[1:]), None) if found: published_dt = dtp.fromisoformat(found.published) pubunix = published_dt.timestamp() return { "st_mode": (stat.S_IFREG | 0o400), "st_size": len(found.content.encode("utf8")), "st_nlink": 1, "st_uid": uid, "st_gid": gid, "st_ctime": pubunix, "st_mtime": pubunix, } raise FuseOSError(errno.ENOENT) def list_dir(self) -> list[str]: with self._lock: return [s.id for s in self.statuses] def readdir(self, path, fh): dir_entries = [] if path != "/": raise FuseOSError(errno.ENOENT) dir_entries = [".", ".."] dir_entries += self.list_dir() return dir_entries def add_statuses(self, statuses: list[Status]): with self._lock: self.statuses.extend(statuses) def open(self, path, flags): self.fd += 1 return self.fd def read(self, path, size, offset, fh): with self._lock: found = next(s for s in self.statuses if s.id == path[1:]) if found: return found.content.encode("utf8") raise FuseOSError(errno.ENOENT) def parse_arguments(): parser = argparse.ArgumentParser( description="Mount a read-only FUSE filesystem for ActivityPub or Mastodon" ) parser.add_argument( "mountpoint", help="The directory where the filesystem will be mounted" ) group = parser.add_mutually_exclusive_group(required=True) group.add_argument( "-a", "--activitypub", action="store_true", help="Use ActivityPub API" ) group.add_argument("-m", "--mastodon", action="store_true", help="Use Mastodon API") parser.add_argument("-s", "--server", required=True, help="The server/host URL") parser.add_argument( "-u", "--username", required=True, help="The username to fetch statuses for" ) args = parser.parse_args() if args.activitypub: args.api_choice = APIChoice.ACTIVITYPUB elif args.mastodon: args.api_choice = APIChoice.MASTODON else: parser.error("Must choose either ActivityPub or Mastodon API") return args # todo: make this a loop supporting paging def status_fetcher(fs: StatusFileSystem, sp: StatusProvider): logger.debug("Waiting to fetch statuses.") time.sleep(5) logger.debug("Fetch statuses.") st = sp.load_statuses() logger.debug("Add statuses to FS.") fs.add_statuses(st) logger.debug("Wait...") time.sleep(5) logger.debug("Done.") def main(args): try: if args.api_choice == APIChoice.MASTODON: status_provider = MastodonStatusProvider(args.server, args.username) else: status_provider = ActivityPubStatusProvider(args.server, args.username) myfs = StatusFileSystem() t = Thread(target=status_fetcher, args=(myfs, status_provider)) t.start() f = FUSE(myfs, args.mountpoint, nothreads=True, foreground=True) except: fuse_exit() raise finally: # q.join() t.join() if __name__ == "__main__": logging.basicConfig(level=logging.INFO) logger.setLevel(logging.DEBUG) args = parse_arguments() main(args)