import argparse import enum import logging from threading import Thread, Event from fuse import FUSE, fuse_exit from de_uvok.activitypub_fuse.status_fuse import StatusFileSystem from de_uvok.activitypub_fuse.providers import ( ActivityPubStatusProvider, MastodonStatusProvider, StatusProvider, ) logger = logging.getLogger(__name__) class APIChoice(enum.Enum): ACTIVITYPUB = "ActivityPub" MASTODON = "Mastodon" def parse_arguments(): parser = argparse.ArgumentParser( description="Mount a read-only FUSE filesystem for ActivityPub or Mastodon" ) parser.add_argument( "mountpoint", help="The directory where the filesystem will be mounted" ) group = parser.add_mutually_exclusive_group(required=True) group.add_argument( "-a", "--activitypub", action="store_true", help="Use ActivityPub API" ) group.add_argument("-m", "--mastodon", action="store_true", help="Use Mastodon API") parser.add_argument("-s", "--server", required=True, help="The server/host URL") parser.add_argument( "-u", "--username", required=True, help="The username to fetch statuses for" ) args = parser.parse_args() if args.activitypub: args.api_choice = APIChoice.ACTIVITYPUB elif args.mastodon: args.api_choice = APIChoice.MASTODON else: parser.error("Must choose either ActivityPub or Mastodon API") return args # todo: make this a loop supporting paging def status_fetcher(fs: StatusFileSystem, sp: StatusProvider, sig_quit: Event): max_id = "" while (max_id is not None) and not (sig_quit.wait(1)): logger.debug("Fetch statuses.") st_rep = sp.load_statuses(max_id) logger.debug("Add statuses to FS.") fs.add_statuses(st_rep[0]) max_id = st_rep[1] logger.debug("Waiting to fetch statuses.") logger.debug("Done.") def main(args): quit_evt = Event() fetch_thread = None try: if args.api_choice == APIChoice.MASTODON: status_provider = MastodonStatusProvider(args.server, args.username) else: status_provider = ActivityPubStatusProvider(args.server, args.username) myfs = StatusFileSystem() fetch_thread = Thread(target=status_fetcher, args=(myfs, status_provider, quit_evt)) fetch_thread.start() FUSE(myfs, args.mountpoint, nothreads=True, foreground=True) except: fuse_exit() raise finally: quit_evt.set() if fetch_thread: fetch_thread.join() if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) logging.getLogger("fuse").setLevel(logging.INFO) logging.getLogger("fuse.log-mixin").setLevel(logging.INFO) args = parse_arguments() main(args)