from datetime import datetime as dtp import errno import stat from threading import Lock from fuse import ( Operations, FuseOSError, LoggingMixIn, fuse_get_context, ) from .types import Status class StatusFileSystem(Operations, LoggingMixIn): def __init__(self): self._lock = Lock() with self._lock: self.statuses: list[Status] = [] self.fd = 0 def getattr(self, path, fh=None): (uid, gid, _) = fuse_get_context() if path == "/": return { "st_mode": (stat.S_IFDIR | 0o700), # Directory "st_nlink": 2, "st_uid": uid, "st_gid": gid, } with self._lock: found = next((s for s in self.statuses if s.id == path[1:]), None) if found: published_dt = dtp.fromisoformat(found.published) pubunix = published_dt.timestamp() return { "st_mode": (stat.S_IFREG | 0o400), "st_size": len(found.content.encode("utf8")), "st_nlink": 1, "st_uid": uid, "st_gid": gid, "st_ctime": pubunix, "st_mtime": pubunix, } raise FuseOSError(errno.ENOENT) def list_dir(self) -> list[str]: with self._lock: return [s.id for s in self.statuses] def readdir(self, path, fh): dir_entries = [] if path != "/": raise FuseOSError(errno.ENOENT) dir_entries = [".", ".."] dir_entries += self.list_dir() return dir_entries def add_statuses(self, statuses: list[Status]): with self._lock: self.statuses.extend(statuses) def open(self, path, flags): # type: ignore self.fd += 1 return self.fd def read(self, path, size, offset, fh): # type: ignore with self._lock: found = next(s for s in self.statuses if s.id == path[1:]) if found: return found.content.encode("utf8") raise FuseOSError(errno.ENOENT)