from datetime import datetime as dtp import errno import stat from queue import SimpleQueue, Empty from fuse import ( Operations, FuseOSError, LoggingMixIn, fuse_get_context, ) from .types import Status class StatusFileSystem(Operations, LoggingMixIn): def __init__(self): self.__queue: SimpleQueue[list[Status]] = SimpleQueue() self.__statuses: list[Status] = [] self.__fd = 0 def getattr(self, path, fh=None): (uid, gid, _) = fuse_get_context() if path == "/": return { "st_mode": (stat.S_IFDIR | 0o700), # Directory "st_nlink": 2, "st_uid": uid, "st_gid": gid, } found = next((s for s in self.__statuses if s.id == path[1:]), None) if found: published_dt = dtp.fromisoformat(found.published) pubunix = published_dt.timestamp() return { "st_mode": (stat.S_IFREG | 0o400), "st_size": len(found.content.encode("utf8")), "st_nlink": 1, "st_uid": uid, "st_gid": gid, "st_ctime": pubunix, "st_mtime": pubunix, } raise FuseOSError(errno.ENOENT) def __update_status_field(self): try: while True: statuses = self.__queue.get_nowait() self.__statuses.extend(statuses) except Empty: pass def list_dir(self) -> list[str]: self.__update_status_field() return [s.id for s in self.__statuses] def readdir(self, path, fh): dir_entries = [] if path != "/": raise FuseOSError(errno.ENOENT) dir_entries = [".", ".."] dir_entries += self.list_dir() return dir_entries def add_statuses(self, statuses: list[Status]): self.__queue.put(statuses) def open(self, path, flags): # type: ignore self.__fd += 1 return self.__fd def read(self, path, size, offset, fh): # type: ignore found = next(s for s in self.__statuses if s.id == path[1:]) if found: return found.content.encode("utf8") raise FuseOSError(errno.ENOENT)