From 0a71d6c3941ed899dd042842081e6aeeda0cfc31 Mon Sep 17 00:00:00 2001 From: uvok Date: Fri, 24 Jan 2025 16:40:03 +0100 Subject: Fix name breaking loading --- de_uvok/activitypub_fuse/fuse.py | 94 --------------------------------- de_uvok/activitypub_fuse/status_fuse.py | 94 +++++++++++++++++++++++++++++++++ fuse-ap.py | 2 +- 3 files changed, 95 insertions(+), 95 deletions(-) delete mode 100644 de_uvok/activitypub_fuse/fuse.py create mode 100644 de_uvok/activitypub_fuse/status_fuse.py diff --git a/de_uvok/activitypub_fuse/fuse.py b/de_uvok/activitypub_fuse/fuse.py deleted file mode 100644 index 46eeb02..0000000 --- a/de_uvok/activitypub_fuse/fuse.py +++ /dev/null @@ -1,94 +0,0 @@ -from datetime import datetime as dtp -import errno -import stat -from queue import SimpleQueue, Empty -from fuse import ( - Operations, - FuseOSError, - LoggingMixIn, - fuse_get_context, -) - -from .types import Status - - -class StatusFileSystem(Operations, LoggingMixIn): - """ - Implements a FUSE file system for mapping (ActivityPub, Mastodon, ...) statuses to text files. - """ - - def __init__(self): - """ - Initialize a new instance of StatusFileSystem, to be used with FUSE. - """ - self.__queue: SimpleQueue[list[Status]] = SimpleQueue() - self.__statuses: list[Status] = [] - self.__fd = 0 - - def getattr(self, path, fh=None): - (uid, gid, _) = fuse_get_context() - if path == "/": - return { - "st_mode": (stat.S_IFDIR | 0o700), # Directory - "st_nlink": 2, - "st_uid": uid, - "st_gid": gid, - } - found = next((s for s in self.__statuses if s.id == path[1:]), None) - - if found: - published_dt = dtp.fromisoformat(found.published) - pubunix = published_dt.timestamp() - return { - "st_mode": (stat.S_IFREG | 0o400), - "st_size": len(found.content.encode("utf8")), - "st_nlink": 1, - "st_uid": uid, - "st_gid": gid, - "st_ctime": pubunix, - "st_mtime": pubunix, - } - raise FuseOSError(errno.ENOENT) - - def __update_status_field(self): - """ - Update the internal statuses list with new statuses from the queue. - """ - try: - while True: - statuses = self.__queue.get_nowait() - self.__statuses.extend(statuses) - except Empty: - pass - - def list_dir(self) -> list[str]: - self.__update_status_field() - return [s.id for s in self.__statuses] - - def readdir(self, path, fh): - dir_entries = [] - if path != "/": - raise FuseOSError(errno.ENOENT) - dir_entries = [".", ".."] - dir_entries += self.list_dir() - return dir_entries - - def add_statuses(self, statuses: list[Status]): - """ - Add new statuses to the queue. - - Args: - statuses (list): A list of Status objects to be added. - """ - self.__queue.put(statuses) - - def open(self, path, flags): # type: ignore - self.__fd += 1 - return self.__fd - - def read(self, path, size, offset, fh): # type: ignore - found = next(s for s in self.__statuses if s.id == path[1:]) - - if found: - return found.content.encode("utf8") - raise FuseOSError(errno.ENOENT) diff --git a/de_uvok/activitypub_fuse/status_fuse.py b/de_uvok/activitypub_fuse/status_fuse.py new file mode 100644 index 0000000..46eeb02 --- /dev/null +++ b/de_uvok/activitypub_fuse/status_fuse.py @@ -0,0 +1,94 @@ +from datetime import datetime as dtp +import errno +import stat +from queue import SimpleQueue, Empty +from fuse import ( + Operations, + FuseOSError, + LoggingMixIn, + fuse_get_context, +) + +from .types import Status + + +class StatusFileSystem(Operations, LoggingMixIn): + """ + Implements a FUSE file system for mapping (ActivityPub, Mastodon, ...) statuses to text files. + """ + + def __init__(self): + """ + Initialize a new instance of StatusFileSystem, to be used with FUSE. + """ + self.__queue: SimpleQueue[list[Status]] = SimpleQueue() + self.__statuses: list[Status] = [] + self.__fd = 0 + + def getattr(self, path, fh=None): + (uid, gid, _) = fuse_get_context() + if path == "/": + return { + "st_mode": (stat.S_IFDIR | 0o700), # Directory + "st_nlink": 2, + "st_uid": uid, + "st_gid": gid, + } + found = next((s for s in self.__statuses if s.id == path[1:]), None) + + if found: + published_dt = dtp.fromisoformat(found.published) + pubunix = published_dt.timestamp() + return { + "st_mode": (stat.S_IFREG | 0o400), + "st_size": len(found.content.encode("utf8")), + "st_nlink": 1, + "st_uid": uid, + "st_gid": gid, + "st_ctime": pubunix, + "st_mtime": pubunix, + } + raise FuseOSError(errno.ENOENT) + + def __update_status_field(self): + """ + Update the internal statuses list with new statuses from the queue. + """ + try: + while True: + statuses = self.__queue.get_nowait() + self.__statuses.extend(statuses) + except Empty: + pass + + def list_dir(self) -> list[str]: + self.__update_status_field() + return [s.id for s in self.__statuses] + + def readdir(self, path, fh): + dir_entries = [] + if path != "/": + raise FuseOSError(errno.ENOENT) + dir_entries = [".", ".."] + dir_entries += self.list_dir() + return dir_entries + + def add_statuses(self, statuses: list[Status]): + """ + Add new statuses to the queue. + + Args: + statuses (list): A list of Status objects to be added. + """ + self.__queue.put(statuses) + + def open(self, path, flags): # type: ignore + self.__fd += 1 + return self.__fd + + def read(self, path, size, offset, fh): # type: ignore + found = next(s for s in self.__statuses if s.id == path[1:]) + + if found: + return found.content.encode("utf8") + raise FuseOSError(errno.ENOENT) diff --git a/fuse-ap.py b/fuse-ap.py index fb476e9..d347405 100644 --- a/fuse-ap.py +++ b/fuse-ap.py @@ -7,7 +7,7 @@ from threading import Thread, Event from fuse import FUSE, fuse_exit -from de_uvok.activitypub_fuse.fuse import StatusFileSystem +from de_uvok.activitypub_fuse.status_fuse import StatusFileSystem from de_uvok.activitypub_fuse.providers import ( ActivityPubStatusProvider, MastodonStatusProvider, -- cgit v1.2.3