import argparse import enum import errno import logging import time import requests import sys import stat from typing import Optional from threading import Thread, Lock, Event from queue import Queue from datetime import datetime as dtp from fuse import ( FUSE, Operations, FuseOSError, LoggingMixIn, fuse_exit, fuse_get_context, ) import urllib.parse logger = logging.getLogger(__name__) class APIChoice(enum.Enum): ACTIVITYPUB = "ActivityPub" MASTODON = "Mastodon" api_url_ap_template = "https://{server}/users/{user}/outbox?page=true" api_url_m_lookup_template = "https://{server}/api/v1/accounts/lookup" api_url_m_status_template = "https://{server}/api/v1/accounts/{uid}/statuses" class Status(object): def __init__(self, id: str, content: str, published: str): self.id = id self.content = content self.published = published class StatusProvider: def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: raise NotImplementedError def _fallback_not_found(self): return [Status("not-found", "User not found", "1970-01-01T00:00:00Z")] def _fallback_error(self, error_msg: str): return [Status("error", error_msg, "1970-01-01T00:00:00Z")] class ActivityPubStatusProvider(StatusProvider): def __init__(self, server: str, user: str): self.server = server self.user = user def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: url = api_url_ap_template.format(server=self.server, user=self.user) if max_id: url += "&" + urllib.parse.urlencode({"max_id": max_id}) logger.debug("Get AP status from %s", url) res = requests.get(url) if res.status_code == 404: return self._fallback_not_found(), None try: res.raise_for_status() except requests.exceptions.RequestException as e: logging.error("Request error: %s", e) return self._fallback_error(getattr(e, "message", str(e))), None stats = res.json() status_items = stats.get("orderedItems", None) if not status_items: return ( self._fallback_error("Malformed content in querying AP outbox."), None, ) ss = [ Status( s["object"]["id"].split("/")[-1], s["object"]["content"], s["object"]["published"], ) for s in status_items if s.get("type", None) == "Create" and "object" in s and all(key in s["object"] for key in ["id", "content", "published"]) and len(s["object"]["content"]) ] return ss, ss[-1].id class MastodonStatusProvider(StatusProvider): def __init__(self, server: str, user: str): self.server = server self.user = user self.userid = 0 def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: url = api_url_m_lookup_template.format(server=self.server) url += "?" + urllib.parse.urlencode({"acct": self.user}) res = requests.get(url) if res.status_code == 404: return self._fallback_not_found(), None try: res.raise_for_status() except requests.exceptions.RequestException as e: logging.error("Request error: %s", e) return self._fallback_error(getattr(e, "message", str(e))), None user = res.json() self.userid = user.get("id", None) if not self.userid: return self._fallback_error("Malformed content in querying user ID."), None url = api_url_m_status_template.format( server=self.server, uid=urllib.parse.quote(self.userid) ) if max_id: url += "?" + urllib.parse.urlencode({"max_id": max_id}) logger.debug("Get Masto status from %s", url) res = requests.get(url) if res.status_code == 404: return self._fallback_not_found(), None try: res.raise_for_status() except requests.exceptions.RequestException as e: logging.error("Request error: %s", e) return self._fallback_error(getattr(e, "message", str(e))), None statuses = res.json() ss = [ Status( s["id"], s["content"], s["created_at"], ) for s in statuses if all(key in s for key in ["id", "content", "created_at"]) and len(s["content"]) ] return ss, ss[-1].id class StatusFileSystem(Operations, LoggingMixIn): def __init__(self): self._lock = Lock() with self._lock: self.statuses: list[Status] = [] self.fd = 0 def getattr(self, path, fh=None): (uid, gid, _) = fuse_get_context() if path == "/": return { "st_mode": (stat.S_IFDIR | 0o700), # Directory "st_nlink": 2, "st_uid": uid, "st_gid": gid, } with self._lock: found = next((s for s in self.statuses if s.id == path[1:]), None) if found: published_dt = dtp.fromisoformat(found.published) pubunix = published_dt.timestamp() return { "st_mode": (stat.S_IFREG | 0o400), "st_size": len(found.content.encode("utf8")), "st_nlink": 1, "st_uid": uid, "st_gid": gid, "st_ctime": pubunix, "st_mtime": pubunix, } raise FuseOSError(errno.ENOENT) def list_dir(self) -> list[str]: with self._lock: return [s.id for s in self.statuses] def readdir(self, path, fh): dir_entries = [] if path != "/": raise FuseOSError(errno.ENOENT) dir_entries = [".", ".."] dir_entries += self.list_dir() return dir_entries def add_statuses(self, statuses: list[Status]): with self._lock: self.statuses.extend(statuses) def open(self, path, flags): self.fd += 1 return self.fd def read(self, path, size, offset, fh): with self._lock: found = next(s for s in self.statuses if s.id == path[1:]) if found: return found.content.encode("utf8") raise FuseOSError(errno.ENOENT) def parse_arguments(): parser = argparse.ArgumentParser( description="Mount a read-only FUSE filesystem for ActivityPub or Mastodon" ) parser.add_argument( "mountpoint", help="The directory where the filesystem will be mounted" ) group = parser.add_mutually_exclusive_group(required=True) group.add_argument( "-a", "--activitypub", action="store_true", help="Use ActivityPub API" ) group.add_argument("-m", "--mastodon", action="store_true", help="Use Mastodon API") parser.add_argument("-s", "--server", required=True, help="The server/host URL") parser.add_argument( "-u", "--username", required=True, help="The username to fetch statuses for" ) args = parser.parse_args() if args.activitypub: args.api_choice = APIChoice.ACTIVITYPUB elif args.mastodon: args.api_choice = APIChoice.MASTODON else: parser.error("Must choose either ActivityPub or Mastodon API") return args # todo: make this a loop supporting paging def status_fetcher(fs: StatusFileSystem, sp: StatusProvider, sig_quit: Event): max_id = "" while (max_id is not None) and not (sig_quit.wait(5)): logger.debug("Fetch statuses.") st_rep = sp.load_statuses(max_id) logger.debug("Add statuses to FS.") fs.add_statuses(st_rep[0]) max_id = st_rep[1] logger.debug("Waiting to fetch statuses.") logger.debug("Done.") def main(args): quit_evt = Event() t = None try: if args.api_choice == APIChoice.MASTODON: status_provider = MastodonStatusProvider(args.server, args.username) else: status_provider = ActivityPubStatusProvider(args.server, args.username) myfs = StatusFileSystem() t = Thread(target=status_fetcher, args=(myfs, status_provider, quit_evt)) t.start() f = FUSE(myfs, args.mountpoint, nothreads=True, foreground=True) except: fuse_exit() raise finally: quit_evt.set() if t: t.join() if __name__ == "__main__": logging.basicConfig(level=logging.INFO) logger.setLevel(logging.DEBUG) args = parse_arguments() main(args)