summaryrefslogtreecommitdiff
path: root/hello-fusepy.py
diff options
context:
space:
mode:
authoruvok2025-01-22 20:05:44 +0100
committeruvok2025-01-22 20:05:44 +0100
commit0a4b9f6c43cddcfa4355f58cc6c5fb91c216e8d4 (patch)
treef48060725095e3e44b2d3332b81321ddacf50a6a /hello-fusepy.py
parentd49fb0eca080111ec2571de33a95a290ba062bd9 (diff)
Split code into modules
Diffstat (limited to 'hello-fusepy.py')
-rw-r--r--hello-fusepy.py206
1 files changed, 11 insertions, 195 deletions
diff --git a/hello-fusepy.py b/hello-fusepy.py
index 175c9a3..a0a7d45 100644
--- a/hello-fusepy.py
+++ b/hello-fusepy.py
@@ -11,17 +11,18 @@ from typing import Optional
from threading import Thread, Lock, Event
from queue import Queue
-from datetime import datetime as dtp
-from fuse import (
- FUSE,
- Operations,
- FuseOSError,
- LoggingMixIn,
- fuse_exit,
- fuse_get_context,
-)
+
import urllib.parse
+from fuse import FUSE, fuse_exit
+
+from de_uvok.activitypub_fuse.status_fuse import StatusFileSystem
+from de_uvok.activitypub_fuse.status_provider import (
+ ActivityPubStatusProvider,
+ MastodonStatusProvider,
+ StatusProvider,
+)
+
logger = logging.getLogger(__name__)
@@ -31,191 +32,6 @@ class APIChoice(enum.Enum):
MASTODON = "Mastodon"
-api_url_ap_template = "https://{server}/users/{user}/outbox?page=true"
-api_url_m_lookup_template = "https://{server}/api/v1/accounts/lookup"
-api_url_m_status_template = "https://{server}/api/v1/accounts/{uid}/statuses"
-
-
-class Status(object):
- def __init__(self, id: str, content: str, published: str):
- self.id = id
- self.content = content
- self.published = published
-
-
-class StatusProvider:
- def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]:
- raise NotImplementedError
-
- def _fallback_not_found(self):
- return [Status("not-found", "User not found", "1970-01-01T00:00:00Z")]
-
- def _fallback_error(self, error_msg: str):
- return [Status("error", error_msg, "1970-01-01T00:00:00Z")]
-
-
-class ActivityPubStatusProvider(StatusProvider):
- def __init__(self, server: str, user: str):
- self.server = server
- self.user = user
-
- def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]:
- url = api_url_ap_template.format(server=self.server, user=self.user)
- if max_id:
- url += "&" + urllib.parse.urlencode({"max_id": max_id})
- logger.debug("Get AP status from %s", url)
- res = requests.get(url)
- if res.status_code == 404:
- return self._fallback_not_found(), None
-
- try:
- res.raise_for_status()
- except requests.exceptions.RequestException as e:
- logging.error("Request error: %s", e)
- return self._fallback_error(getattr(e, "message", str(e))), None
-
- stats = res.json()
- status_items = stats.get("orderedItems", None)
- if not status_items:
- return (
- self._fallback_error("Malformed content in querying AP outbox."),
- None,
- )
-
- # consider reposts for getting max_id...
- ss = [
- Status(
- s["object"]["id"].split("/")[-1],
- s["object"]["content"],
- s["object"]["published"],
- )
- for s in status_items
- if s.get("type", None) == "Create"
- and "object" in s
- and all(key in s["object"] for key in ["id", "content", "published"])
- ]
- # ... but don't return it
- return [s for s in ss if len(s.content)], ss[-1].id
-
-
-class MastodonStatusProvider(StatusProvider):
- def __init__(self, server: str, user: str):
- self.server = server
- self.user = user
- self.userid = 0
-
- def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]:
- url = api_url_m_lookup_template.format(server=self.server)
- url += "?" + urllib.parse.urlencode({"acct": self.user})
- res = requests.get(url)
- if res.status_code == 404:
- return self._fallback_not_found(), None
- try:
- res.raise_for_status()
- except requests.exceptions.RequestException as e:
- logging.error("Request error: %s", e)
- return self._fallback_error(getattr(e, "message", str(e))), None
-
- user = res.json()
- self.userid = user.get("id", None)
- if not self.userid:
- return self._fallback_error("Malformed content in querying user ID."), None
-
- url = api_url_m_status_template.format(
- server=self.server, uid=urllib.parse.quote(self.userid)
- )
-
- if max_id:
- url += "?" + urllib.parse.urlencode({"max_id": max_id})
- logger.debug("Get Masto status from %s", url)
-
- res = requests.get(url)
- if res.status_code == 404:
- return self._fallback_not_found(), None
- try:
- res.raise_for_status()
- except requests.exceptions.RequestException as e:
- logging.error("Request error: %s", e)
- return self._fallback_error(getattr(e, "message", str(e))), None
- statuses = res.json()
- # consider reposts for getting max_id...
- ss = [
- Status(
- s["id"],
- s["content"],
- s["created_at"],
- )
- for s in statuses
- if all(key in s for key in ["id", "content", "created_at"])
- ]
- # ... but don't return it
- return [s for s in ss if len(s.content)], ss[-1].id
-
-
-class StatusFileSystem(Operations, LoggingMixIn):
- def __init__(self):
- self._lock = Lock()
-
- with self._lock:
- self.statuses: list[Status] = []
-
- self.fd = 0
-
- def getattr(self, path, fh=None):
- (uid, gid, _) = fuse_get_context()
- if path == "/":
- return {
- "st_mode": (stat.S_IFDIR | 0o700), # Directory
- "st_nlink": 2,
- "st_uid": uid,
- "st_gid": gid,
- }
- with self._lock:
- found = next((s for s in self.statuses if s.id == path[1:]), None)
-
- if found:
- published_dt = dtp.fromisoformat(found.published)
- pubunix = published_dt.timestamp()
- return {
- "st_mode": (stat.S_IFREG | 0o400),
- "st_size": len(found.content.encode("utf8")),
- "st_nlink": 1,
- "st_uid": uid,
- "st_gid": gid,
- "st_ctime": pubunix,
- "st_mtime": pubunix,
- }
- raise FuseOSError(errno.ENOENT)
-
- def list_dir(self) -> list[str]:
- with self._lock:
- return [s.id for s in self.statuses]
-
- def readdir(self, path, fh):
- dir_entries = []
- if path != "/":
- raise FuseOSError(errno.ENOENT)
- dir_entries = [".", ".."]
- dir_entries += self.list_dir()
- return dir_entries
-
- def add_statuses(self, statuses: list[Status]):
- with self._lock:
- self.statuses.extend(statuses)
-
- def open(self, path, flags):
- self.fd += 1
- return self.fd
-
- def read(self, path, size, offset, fh):
- with self._lock:
- found = next(s for s in self.statuses if s.id == path[1:])
-
- if found:
- return found.content.encode("utf8")
- raise FuseOSError(errno.ENOENT)
-
-
def parse_arguments():
parser = argparse.ArgumentParser(
description="Mount a read-only FUSE filesystem for ActivityPub or Mastodon"
@@ -272,7 +88,7 @@ def main(args):
t = Thread(target=status_fetcher, args=(myfs, status_provider, quit_evt))
t.start()
- f = FUSE(myfs, args.mountpoint, nothreads=True, foreground=True)
+ FUSE(myfs, args.mountpoint, nothreads=True, foreground=True)
except:
fuse_exit()
raise