import argparse import enum import errno import logging import time import requests import sys import stat from typing import Optional from threading import Thread, Lock, Event from queue import Queue import urllib.parse from fuse import FUSE, fuse_exit from de_uvok.activitypub_fuse.fuse import StatusFileSystem from de_uvok.activitypub_fuse.providers import ( ActivityPubStatusProvider, MastodonStatusProvider, StatusProvider, ) logger = logging.getLogger(__name__) class APIChoice(enum.Enum): ACTIVITYPUB = "ActivityPub" MASTODON = "Mastodon" def parse_arguments(): parser = argparse.ArgumentParser( description="Mount a read-only FUSE filesystem for ActivityPub or Mastodon" ) parser.add_argument( "mountpoint", help="The directory where the filesystem will be mounted" ) group = parser.add_mutually_exclusive_group(required=True) group.add_argument( "-a", "--activitypub", action="store_true", help="Use ActivityPub API" ) group.add_argument("-m", "--mastodon", action="store_true", help="Use Mastodon API") parser.add_argument("-s", "--server", required=True, help="The server/host URL") parser.add_argument( "-u", "--username", required=True, help="The username to fetch statuses for" ) args = parser.parse_args() if args.activitypub: args.api_choice = APIChoice.ACTIVITYPUB elif args.mastodon: args.api_choice = APIChoice.MASTODON else: parser.error("Must choose either ActivityPub or Mastodon API") return args # todo: make this a loop supporting paging def status_fetcher(fs: StatusFileSystem, sp: StatusProvider, sig_quit: Event): max_id = "" while (max_id is not None) and not (sig_quit.wait(5)): logger.debug("Fetch statuses.") st_rep = sp.load_statuses(max_id) logger.debug("Add statuses to FS.") fs.add_statuses(st_rep[0]) max_id = st_rep[1] logger.debug("Waiting to fetch statuses.") logger.debug("Done.") def main(args): quit_evt = Event() t = None try: if args.api_choice == APIChoice.MASTODON: status_provider = MastodonStatusProvider(args.server, args.username) else: status_provider = ActivityPubStatusProvider(args.server, args.username) myfs = StatusFileSystem() t = Thread(target=status_fetcher, args=(myfs, status_provider, quit_evt)) t.start() FUSE(myfs, args.mountpoint, nothreads=True, foreground=True) except: fuse_exit() raise finally: quit_evt.set() if t: t.join() if __name__ == "__main__": logging.basicConfig(level=logging.INFO) logger.setLevel(logging.DEBUG) args = parse_arguments() main(args)