diff options
Diffstat (limited to 'de_uvok/activitypub_fuse/status_fuse.py')
-rw-r--r-- | de_uvok/activitypub_fuse/status_fuse.py | 94 |
1 files changed, 94 insertions, 0 deletions
diff --git a/de_uvok/activitypub_fuse/status_fuse.py b/de_uvok/activitypub_fuse/status_fuse.py new file mode 100644 index 0000000..46eeb02 --- /dev/null +++ b/de_uvok/activitypub_fuse/status_fuse.py @@ -0,0 +1,94 @@ +from datetime import datetime as dtp +import errno +import stat +from queue import SimpleQueue, Empty +from fuse import ( + Operations, + FuseOSError, + LoggingMixIn, + fuse_get_context, +) + +from .types import Status + + +class StatusFileSystem(Operations, LoggingMixIn): + """ + Implements a FUSE file system for mapping (ActivityPub, Mastodon, ...) statuses to text files. + """ + + def __init__(self): + """ + Initialize a new instance of StatusFileSystem, to be used with FUSE. + """ + self.__queue: SimpleQueue[list[Status]] = SimpleQueue() + self.__statuses: list[Status] = [] + self.__fd = 0 + + def getattr(self, path, fh=None): + (uid, gid, _) = fuse_get_context() + if path == "/": + return { + "st_mode": (stat.S_IFDIR | 0o700), # Directory + "st_nlink": 2, + "st_uid": uid, + "st_gid": gid, + } + found = next((s for s in self.__statuses if s.id == path[1:]), None) + + if found: + published_dt = dtp.fromisoformat(found.published) + pubunix = published_dt.timestamp() + return { + "st_mode": (stat.S_IFREG | 0o400), + "st_size": len(found.content.encode("utf8")), + "st_nlink": 1, + "st_uid": uid, + "st_gid": gid, + "st_ctime": pubunix, + "st_mtime": pubunix, + } + raise FuseOSError(errno.ENOENT) + + def __update_status_field(self): + """ + Update the internal statuses list with new statuses from the queue. + """ + try: + while True: + statuses = self.__queue.get_nowait() + self.__statuses.extend(statuses) + except Empty: + pass + + def list_dir(self) -> list[str]: + self.__update_status_field() + return [s.id for s in self.__statuses] + + def readdir(self, path, fh): + dir_entries = [] + if path != "/": + raise FuseOSError(errno.ENOENT) + dir_entries = [".", ".."] + dir_entries += self.list_dir() + return dir_entries + + def add_statuses(self, statuses: list[Status]): + """ + Add new statuses to the queue. + + Args: + statuses (list): A list of Status objects to be added. + """ + self.__queue.put(statuses) + + def open(self, path, flags): # type: ignore + self.__fd += 1 + return self.__fd + + def read(self, path, size, offset, fh): # type: ignore + found = next(s for s in self.__statuses if s.id == path[1:]) + + if found: + return found.content.encode("utf8") + raise FuseOSError(errno.ENOENT) |