summaryrefslogtreecommitdiff
path: root/de_uvok/activitypub_fuse/status_fuse.py
diff options
context:
space:
mode:
Diffstat (limited to 'de_uvok/activitypub_fuse/status_fuse.py')
-rw-r--r--de_uvok/activitypub_fuse/status_fuse.py94
1 files changed, 94 insertions, 0 deletions
diff --git a/de_uvok/activitypub_fuse/status_fuse.py b/de_uvok/activitypub_fuse/status_fuse.py
new file mode 100644
index 0000000..46eeb02
--- /dev/null
+++ b/de_uvok/activitypub_fuse/status_fuse.py
@@ -0,0 +1,94 @@
+from datetime import datetime as dtp
+import errno
+import stat
+from queue import SimpleQueue, Empty
+from fuse import (
+ Operations,
+ FuseOSError,
+ LoggingMixIn,
+ fuse_get_context,
+)
+
+from .types import Status
+
+
+class StatusFileSystem(Operations, LoggingMixIn):
+ """
+ Implements a FUSE file system for mapping (ActivityPub, Mastodon, ...) statuses to text files.
+ """
+
+ def __init__(self):
+ """
+ Initialize a new instance of StatusFileSystem, to be used with FUSE.
+ """
+ self.__queue: SimpleQueue[list[Status]] = SimpleQueue()
+ self.__statuses: list[Status] = []
+ self.__fd = 0
+
+ def getattr(self, path, fh=None):
+ (uid, gid, _) = fuse_get_context()
+ if path == "/":
+ return {
+ "st_mode": (stat.S_IFDIR | 0o700), # Directory
+ "st_nlink": 2,
+ "st_uid": uid,
+ "st_gid": gid,
+ }
+ found = next((s for s in self.__statuses if s.id == path[1:]), None)
+
+ if found:
+ published_dt = dtp.fromisoformat(found.published)
+ pubunix = published_dt.timestamp()
+ return {
+ "st_mode": (stat.S_IFREG | 0o400),
+ "st_size": len(found.content.encode("utf8")),
+ "st_nlink": 1,
+ "st_uid": uid,
+ "st_gid": gid,
+ "st_ctime": pubunix,
+ "st_mtime": pubunix,
+ }
+ raise FuseOSError(errno.ENOENT)
+
+ def __update_status_field(self):
+ """
+ Update the internal statuses list with new statuses from the queue.
+ """
+ try:
+ while True:
+ statuses = self.__queue.get_nowait()
+ self.__statuses.extend(statuses)
+ except Empty:
+ pass
+
+ def list_dir(self) -> list[str]:
+ self.__update_status_field()
+ return [s.id for s in self.__statuses]
+
+ def readdir(self, path, fh):
+ dir_entries = []
+ if path != "/":
+ raise FuseOSError(errno.ENOENT)
+ dir_entries = [".", ".."]
+ dir_entries += self.list_dir()
+ return dir_entries
+
+ def add_statuses(self, statuses: list[Status]):
+ """
+ Add new statuses to the queue.
+
+ Args:
+ statuses (list): A list of Status objects to be added.
+ """
+ self.__queue.put(statuses)
+
+ def open(self, path, flags): # type: ignore
+ self.__fd += 1
+ return self.__fd
+
+ def read(self, path, size, offset, fh): # type: ignore
+ found = next(s for s in self.__statuses if s.id == path[1:])
+
+ if found:
+ return found.content.encode("utf8")
+ raise FuseOSError(errno.ENOENT)