diff options
| author | uvok | 2025-01-23 19:53:51 +0100 | 
|---|---|---|
| committer | uvok | 2025-01-23 19:53:51 +0100 | 
| commit | 0e271287e942bfea8d1f3753a5725553e7500650 (patch) | |
| tree | fff2b42145ee194d0ecbf08a5f989baa55106586 | |
| parent | 91c02046f3da8681094c6bca32f1a2b6d7786682 (diff) | |
| parent | 29ad7bd09839ec958ee40a20442f2ab337da86e1 (diff) | |
Merge branch 'threading'
| -rw-r--r-- | .gitignore | 3 | ||||
| -rw-r--r-- | de_uvok/activitypub_fuse/__init__.py | 0 | ||||
| -rw-r--r-- | de_uvok/activitypub_fuse/fuse.py | 76 | ||||
| -rw-r--r-- | de_uvok/activitypub_fuse/providers.py | 122 | ||||
| -rw-r--r-- | de_uvok/activitypub_fuse/types.py | 5 | ||||
| -rw-r--r-- | fuse-ap.py | 105 | ||||
| -rw-r--r-- | hello-fusepy.py | 235 | 
7 files changed, 311 insertions, 235 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..da4129b --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/venv/ +*.pyc +/.vscode/ diff --git a/de_uvok/activitypub_fuse/__init__.py b/de_uvok/activitypub_fuse/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/de_uvok/activitypub_fuse/__init__.py diff --git a/de_uvok/activitypub_fuse/fuse.py b/de_uvok/activitypub_fuse/fuse.py new file mode 100644 index 0000000..8be5351 --- /dev/null +++ b/de_uvok/activitypub_fuse/fuse.py @@ -0,0 +1,76 @@ +from datetime import datetime as dtp +import errno +import stat +from threading import Lock +from fuse import ( +    Operations, +    FuseOSError, +    LoggingMixIn, +    fuse_get_context, +) + +from .types import Status + + +class StatusFileSystem(Operations, LoggingMixIn): +    def __init__(self): +        self._lock = Lock() + +        with self._lock: +            self.statuses: list[Status] = [] + +        self.fd = 0 + +    def getattr(self, path, fh=None): +        (uid, gid, _) = fuse_get_context() +        if path == "/": +            return { +                "st_mode": (stat.S_IFDIR | 0o700),  # Directory +                "st_nlink": 2, +                "st_uid": uid, +                "st_gid": gid, +            } +        with self._lock: +            found = next((s for s in self.statuses if s.id == path[1:]), None) + +        if found: +            published_dt = dtp.fromisoformat(found.published) +            pubunix = published_dt.timestamp() +            return { +                "st_mode": (stat.S_IFREG | 0o400), +                "st_size": len(found.content.encode("utf8")), +                "st_nlink": 1, +                "st_uid": uid, +                "st_gid": gid, +                "st_ctime": pubunix, +                "st_mtime": pubunix, +            } +        raise FuseOSError(errno.ENOENT) + +    def list_dir(self) -> list[str]: +        with self._lock: +            return [s.id for s in self.statuses] + +    def readdir(self, path, fh): +        dir_entries = [] +        if path != "/": +            raise FuseOSError(errno.ENOENT) +        dir_entries = [".", ".."] +        dir_entries += self.list_dir() +        return dir_entries + +    def add_statuses(self, statuses: list[Status]): +        with self._lock: +            self.statuses.extend(statuses) + +    def open(self, path, flags):  # type: ignore +        self.fd += 1 +        return self.fd + +    def read(self, path, size, offset, fh):  # type: ignore +        with self._lock: +            found = next(s for s in self.statuses if s.id == path[1:]) + +        if found: +            return found.content.encode("utf8") +        raise FuseOSError(errno.ENOENT) diff --git a/de_uvok/activitypub_fuse/providers.py b/de_uvok/activitypub_fuse/providers.py new file mode 100644 index 0000000..19cee62 --- /dev/null +++ b/de_uvok/activitypub_fuse/providers.py @@ -0,0 +1,122 @@ +from typing import Optional + +import logging +import requests +import urllib.parse + +from .types import Status + +logger = logging.Logger(__name__) + +_api_url_ap_template = "https://{server}/users/{user}/outbox?page=true" +_api_url_m_lookup_template = "https://{server}/api/v1/accounts/lookup" +_api_url_m_status_template = "https://{server}/api/v1/accounts/{uid}/statuses" + + +class StatusProvider: +    def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: +        raise NotImplementedError + +    def _fallback_not_found(self): +        return [Status("not-found", "User not found", "1970-01-01T00:00:00Z")] + +    def _fallback_error(self, error_msg: str): +        return [Status("error", error_msg, "1970-01-01T00:00:00Z")] + + +class ActivityPubStatusProvider(StatusProvider): +    def __init__(self, server: str, user: str): +        self.server = server +        self.user = user + +    def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: +        url = _api_url_ap_template.format(server=self.server, user=self.user) +        if max_id: +            url += "&" + urllib.parse.urlencode({"max_id": max_id}) +        logger.debug("Get AP status from %s", url) +        res = requests.get(url) +        if res.status_code == 404: +            return self._fallback_not_found(), None + +        try: +            res.raise_for_status() +        except requests.exceptions.RequestException as e: +            logger.error("Request error: %s", e) +            return self._fallback_error(getattr(e, "message", str(e))), None + +        stats = res.json() +        status_items = stats.get("orderedItems", None) +        if not status_items: +            return ( +                self._fallback_error("Malformed content in querying AP outbox."), +                None, +            ) + +        # consider reposts for getting max_id... +        ss = [ +            Status( +                s["object"]["id"].split("/")[-1], +                s["object"]["content"], +                s["object"]["published"], +            ) +            for s in status_items +            if s.get("type", None) == "Create" +            and "object" in s +            and all(key in s["object"] for key in ["id", "content", "published"]) +        ] +        # ... but don't return it +        return [s for s in ss if len(s.content)], ss[-1].id + + +class MastodonStatusProvider(StatusProvider): +    def __init__(self, server: str, user: str): +        self.server = server +        self.user = user +        self.userid = 0 + +    def load_statuses(self, max_id="") -> tuple[list[Status], Optional[str]]: +        url = _api_url_m_lookup_template.format(server=self.server) +        url += "?" + urllib.parse.urlencode({"acct": self.user}) +        res = requests.get(url) +        if res.status_code == 404: +            return self._fallback_not_found(), None +        try: +            res.raise_for_status() +        except requests.exceptions.RequestException as e: +            logger.error("Request error: %s", e) +            return self._fallback_error(getattr(e, "message", str(e))), None + +        user = res.json() +        self.userid = user.get("id", None) +        if not self.userid: +            return self._fallback_error("Malformed content in querying user ID."), None + +        url = _api_url_m_status_template.format( +            server=self.server, uid=urllib.parse.quote(self.userid) +        ) + +        if max_id: +            url += "?" + urllib.parse.urlencode({"max_id": max_id}) +        logger.debug("Get Masto status from %s", url) + +        res = requests.get(url) +        if res.status_code == 404: +            return self._fallback_not_found(), None +        try: +            res.raise_for_status() +        except requests.exceptions.RequestException as e: +            logger.error("Request error: %s", e) +            return self._fallback_error(getattr(e, "message", str(e))), None +        statuses = res.json() +        # consider reposts for getting max_id... +        ss = [ +            Status( +                s["id"], +                s["content"], +                s["created_at"], +            ) +            for s in statuses +            if all(key in s for key in ["id", "content", "created_at"]) +        ] +        # ... but don't return it +        return [s for s in ss if len(s.content)], ss[-1].id diff --git a/de_uvok/activitypub_fuse/types.py b/de_uvok/activitypub_fuse/types.py new file mode 100644 index 0000000..d354335 --- /dev/null +++ b/de_uvok/activitypub_fuse/types.py @@ -0,0 +1,5 @@ +class Status(object): +    def __init__(self, id: str, content: str, published: str): +        self.id = id +        self.content = content +        self.published = published diff --git a/fuse-ap.py b/fuse-ap.py new file mode 100644 index 0000000..f3fd478 --- /dev/null +++ b/fuse-ap.py @@ -0,0 +1,105 @@ +import argparse +import enum +import errno +import logging +import time +import requests +import sys +import stat + +from typing import Optional +from threading import Thread, Lock, Event +from queue import Queue + + +import urllib.parse + +from fuse import FUSE, fuse_exit + +from de_uvok.activitypub_fuse.fuse import StatusFileSystem +from de_uvok.activitypub_fuse.providers import ( +    ActivityPubStatusProvider, +    MastodonStatusProvider, +    StatusProvider, +) + + +logger = logging.getLogger(__name__) + + +class APIChoice(enum.Enum): +    ACTIVITYPUB = "ActivityPub" +    MASTODON = "Mastodon" + + +def parse_arguments(): +    parser = argparse.ArgumentParser( +        description="Mount a read-only FUSE filesystem for ActivityPub or Mastodon" +    ) +    parser.add_argument( +        "mountpoint", help="The directory where the filesystem will be mounted" +    ) +    group = parser.add_mutually_exclusive_group(required=True) +    group.add_argument( +        "-a", "--activitypub", action="store_true", help="Use ActivityPub API" +    ) +    group.add_argument("-m", "--mastodon", action="store_true", help="Use Mastodon API") +    parser.add_argument("-s", "--server", required=True, help="The server/host URL") +    parser.add_argument( +        "-u", "--username", required=True, help="The username to fetch statuses for" +    ) + +    args = parser.parse_args() + +    if args.activitypub: +        args.api_choice = APIChoice.ACTIVITYPUB +    elif args.mastodon: +        args.api_choice = APIChoice.MASTODON +    else: +        parser.error("Must choose either ActivityPub or Mastodon API") + +    return args + + +# todo: make this a loop supporting paging +def status_fetcher(fs: StatusFileSystem, sp: StatusProvider, sig_quit: Event): +    max_id = "" +    while (max_id is not None) and not (sig_quit.wait(5)): +        logger.debug("Fetch statuses.") +        st_rep = sp.load_statuses(max_id) +        logger.debug("Add statuses to FS.") +        fs.add_statuses(st_rep[0]) +        max_id = st_rep[1] +        logger.debug("Waiting to fetch statuses.") +    logger.debug("Done.") + + +def main(args): +    quit_evt = Event() +    t = None +    try: +        if args.api_choice == APIChoice.MASTODON: +            status_provider = MastodonStatusProvider(args.server, args.username) +        else: +            status_provider = ActivityPubStatusProvider(args.server, args.username) + +        myfs = StatusFileSystem() + +        t = Thread(target=status_fetcher, args=(myfs, status_provider, quit_evt)) +        t.start() + +        FUSE(myfs, args.mountpoint, nothreads=True, foreground=True) +    except: +        fuse_exit() +        raise +    finally: +        quit_evt.set() +        if t: +            t.join() + + +if __name__ == "__main__": +    logging.basicConfig(level=logging.INFO) +    logger.setLevel(logging.DEBUG) +    args = parse_arguments() +    main(args) diff --git a/hello-fusepy.py b/hello-fusepy.py deleted file mode 100644 index fdeec00..0000000 --- a/hello-fusepy.py +++ /dev/null @@ -1,235 +0,0 @@ -import argparse -import enum -import errno -import logging -import requests -import sys -import stat - -from datetime import datetime as dtp -from fuse import ( -    FUSE, -    Operations, -    FuseOSError, -    LoggingMixIn, -    fuse_exit, -    fuse_get_context, -) -import urllib.parse - - -class APIChoice(enum.Enum): -    ACTIVITYPUB = "ActivityPub" -    MASTODON = "Mastodon" - - -api_url_ap_template = "https://{server}/users/{user}/outbox?page=true" -api_url_m_lookup_template = "https://{server}/api/v1/accounts/lookup" -api_url_m_status_template = "https://{server}/api/v1/accounts/{uid}/statuses" - - -class Status(object): -    def __init__(self, id: str, content: str, published: str): -        self.id = id -        self.content = content -        self.published = published - - -class StatusProvider: -    def load_statuses(self) -> list[Status]: -        raise NotImplementedError - -    def _fallback_not_found(self): -        return [Status("not-found", "User not found", "1970-01-01T00:00:00Z")] - -    def _fallback_error(self, error_msg: str): -        return [Status("error", error_msg, "1970-01-01T00:00:00Z")] - - -class ActivityPubStatusProvider(StatusProvider): -    def __init__(self, server: str, user: str): -        self.server = server -        self.user = user - -    def load_statuses(self) -> list[Status]: -        url = api_url_ap_template.format(server=self.server, user=self.user) -        res = requests.get(url) -        if res.status_code == 404: -            return self._fallback_not_found() - -        try: -            res.raise_for_status() -        except requests.exceptions.RequestException as e: -            logging.error("Request error: %s", e) -            return self._fallback_error(getattr(e, "message", str(e))) - -        stats = res.json() -        status_items = stats.get("orderedItems", None) -        if not status_items: -            return self._fallback_error("Malformed content in querying AP outbox.") - -        return [ -            Status( -                s["object"]["id"].split("/")[-1], -                s["object"]["content"], -                s["object"]["published"], -            ) -            for s in status_items -            if s.get("type", None) == "Create" -            and "object" in s -            and all(key in s["object"] for key in ["id", "content", "published"]) -            and len(s["object"]["content"]) -        ] - - -class MastodonStatusProvider(StatusProvider): -    def __init__(self, server: str, user: str): -        self.server = server -        self.user = user -        self.userid = 0 - -    def load_statuses(self) -> list[Status]: -        url = api_url_m_lookup_template.format(server=self.server) -        url += "?" + urllib.parse.urlencode({"acct": self.user}) -        res = requests.get(url) -        if res.status_code == 404: -            return self._fallback_not_found() -        try: -            res.raise_for_status() -        except requests.exceptions.RequestException as e: -            logging.error("Request error: %s", e) -            return self._fallback_error(getattr(e, "message", str(e))) - -        user = res.json() -        self.userid = user.get("id", None) -        if not self.userid: -            return self._fallback_error("Malformed content in querying user ID.") - -        url = api_url_m_status_template.format( -            server=self.server, uid=urllib.parse.quote(self.userid) -        ) -        res = requests.get(url) -        if res.status_code == 404: -            return self._fallback_not_found() -        try: -            res.raise_for_status() -        except requests.exceptions.RequestException as e: -            logging.error("Request error: %s", e) -            return self._fallback_error(getattr(e, "message", str(e))) -        statuses = res.json() -        return [ -            Status( -                s["id"], -                s["content"], -                s["created_at"], -            ) -            for s in statuses -            if all(key in s for key in ["id", "content", "created_at"]) -            and len(s["content"]) -        ] - - -class StatusFileSystem(Operations, LoggingMixIn): -    def __init__(self, api: APIChoice, server: str, user: str): -        self.statuses: list[Status] = [] -        self.fd = 0 -        self.api = api -        if api == APIChoice.MASTODON: -            self.status_provider = MastodonStatusProvider(server, user) -        else: -            self.status_provider = ActivityPubStatusProvider(server, user) - -    def getattr(self, path, fh=None): -        (uid, gid, _) = fuse_get_context() -        if path == "/": -            return { -                "st_mode": (stat.S_IFDIR | 0o700),  # Directory -                "st_nlink": 2, -                "st_uid": uid, -                "st_gid": gid, -            } -        found = next((s for s in self.statuses if s.id == path[1:]), None) -        if found: -            published_dt = dtp.fromisoformat(found.published) -            pubunix = published_dt.timestamp() -            return { -                "st_mode": (stat.S_IFREG | 0o400), -                "st_size": len(found.content.encode("utf8")), -                "st_nlink": 1, -                "st_uid": uid, -                "st_gid": gid, -                "st_ctime": pubunix, -                "st_mtime": pubunix, -            } -        raise FuseOSError(errno.ENOENT) - -    def list_dir(self) -> list[str]: -        return [s.id for s in self.statuses] - -    def readdir(self, path, fh): -        dir_entries = [] -        if path != "/": -            raise FuseOSError(errno.ENOENT) -        dir_entries = [".", ".."] -        if not self.statuses: -            self.statuses = self.status_provider.load_statuses() -        dir_entries += self.list_dir() -        return dir_entries - -    def open(self, path, flags): -        self.fd += 1 -        return self.fd - -    def read(self, path, size, offset, fh): -        found = next(s for s in self.statuses if s.id == path[1:]) -        if found: -            return found.content.encode("utf8") -        raise FuseOSError(errno.ENOENT) - - -def parse_arguments(): -    parser = argparse.ArgumentParser( -        description="Mount a read-only FUSE filesystem for ActivityPub or Mastodon" -    ) -    parser.add_argument( -        "mountpoint", help="The directory where the filesystem will be mounted" -    ) -    group = parser.add_mutually_exclusive_group(required=True) -    group.add_argument( -        "-a", "--activitypub", action="store_true", help="Use ActivityPub API" -    ) -    group.add_argument("-m", "--mastodon", action="store_true", help="Use Mastodon API") -    parser.add_argument("-s", "--server", required=True, help="The server/host URL") -    parser.add_argument( -        "-u", "--username", required=True, help="The username to fetch statuses for" -    ) - -    args = parser.parse_args() - -    if args.activitypub: -        args.api_choice = APIChoice.ACTIVITYPUB -    elif args.mastodon: -        args.api_choice = APIChoice.MASTODON -    else: -        parser.error("Must choose either ActivityPub or Mastodon API") - -    return args - - -def main(args): -    try: -        FUSE( -            StatusFileSystem(args.api_choice, args.server, args.username), -            args.mountpoint, -            nothreads=True, -            foreground=True, -        ) -    except: -        fuse_exit() -        raise - - -if __name__ == "__main__": -    logging.basicConfig(level=logging.DEBUG) -    args = parse_arguments() -    main(args)  | 
