from datetime import datetime as dtp import errno import logging import stat from queue import SimpleQueue, Empty from fuse import ( Operations, FuseOSError, LoggingMixIn, fuse_get_context, ) from .types import Status logger = logging.getLogger(__name__) class StatusFileSystem(Operations, LoggingMixIn): """ Implements a FUSE file system for mapping (ActivityPub, Mastodon, ...) statuses to text files. """ def __init__(self): """ Initialize a new instance of StatusFileSystem, to be used with FUSE. """ self.__queue: SimpleQueue[list[Status]] = SimpleQueue() self.__statuses: list[Status] = [] self.__fd = 0 def getattr(self, path, fh=None): (uid, gid, _) = fuse_get_context() if path == "/": return { "st_mode": (stat.S_IFDIR | 0o700), # Directory "st_nlink": 2, "st_uid": uid, "st_gid": gid, } found = next((s for s in self.__statuses if s.id == path[1:]), None) if found: published_dt = dtp.fromisoformat(found.published) pubunix = published_dt.timestamp() return { "st_mode": (stat.S_IFREG | 0o400), "st_size": len(found.content.encode("utf8")), "st_nlink": 1, "st_uid": uid, "st_gid": gid, "st_ctime": pubunix, "st_mtime": pubunix, } raise FuseOSError(errno.ENOENT) def __update_status_field(self): """ Update the internal statuses list with new statuses from the queue. """ # TODO: Not thread-safe. But no idea what to do about it. # Don't set threading to true in FUSE constructor/options. try: while True: statuses = self.__queue.get_nowait() self.__statuses.extend(statuses) except Empty: pass def list_dir(self) -> list[str]: self.__update_status_field() return [s.id for s in self.__statuses] def readdir(self, path, fh): dir_entries = [] if path != "/": raise FuseOSError(errno.ENOENT) dir_entries = [".", ".."] dir_entries += self.list_dir() return dir_entries def add_statuses(self, statuses: list[Status]): """ Add new statuses to the queue. Args: statuses (list): A list of Status objects to be added. """ self.__queue.put(statuses) def open(self, path, flags): # type: ignore self.__fd += 1 return self.__fd def read(self, path, size, offset, fh): # type: ignore found = next(s for s in self.__statuses if s.id == path[1:]) if found: return found.content.encode("utf8") raise FuseOSError(errno.ENOENT)