from argparse import Namespace from enum import Enum from pathlib import Path from typing import Any, NamedTuple from librespot.metadata import ( AlbumId, ArtistId, EpisodeId, PlayableId, PlaylistId, ShowId, TrackId, ) from librespot.util import bytes_to_hex from zotify import Session from zotify.config import Config from zotify.file import TranscodingError from zotify.loader import Loader from zotify.playable import Track from zotify.printer import Printer, PrintChannel from zotify.utils import API_URL, AudioFormat, b62_to_hex def client(args: Namespace) -> None: config = Config(args) Printer(config) with Loader("Logging in..."): if config.credentials is False: session = Session() else: session = Session( cred_file=config.credentials, save=True, language=config.language ) selection = Selection(session) try: if args.search: ids = selection.search(args.search, args.category) elif args.playlist: ids = selection.get("playlists", "items") elif args.followed: ids = selection.get("following?type=artist", "artists") elif args.liked_tracks: ids = selection.get("tracks", "items") elif args.liked_episodes: ids = selection.get("episodes", "items") elif args.download: ids = [] for x in args.download: ids.extend(selection.from_file(x)) elif args.urls: ids = args.urls except (FileNotFoundError, ValueError): Printer.print(PrintChannel.WARNINGS, "there is nothing to do") return app = App(config, session) with Loader("Parsing input..."): try: app.parse(ids) except (IndexError, TypeError) as e: Printer.print(PrintChannel.ERRORS, str(e)) app.download() class Selection: def __init__(self, session: Session): self.__session = session def search( self, search_text: str, category: list = [ "track", "album", "artist", "playlist", "show", "episode", ], ) -> list[str]: categories = ",".join(category) resp = self.__session.api().invoke_url( API_URL + "search", { "q": search_text, "type": categories, "include_external": "audio", "market": self.__session.country(), }, limit=10, offset=0, ) count = 0 links = [] for c in categories.split(","): label = c + "s" if len(resp[label]["items"]) > 0: print(f"\n### {label.capitalize()} ###") for item in resp[label]["items"]: links.append(item) self.__print(count + 1, item) count += 1 return self.__get_selection(links) def get(self, item: str, suffix: str) -> list[str]: resp = self.__session.api().invoke_url(f"{API_URL}me/{item}", limit=50)[suffix] for i in range(len(resp)): self.__print(i + 1, resp[i]) return self.__get_selection(resp) @staticmethod def from_file(file_path: Path) -> list[str]: with open(file_path, "r", encoding="utf-8") as f: return [line.strip() for line in f.readlines()] @staticmethod def __get_selection(items: list[dict[str, Any]]) -> list[str]: print("\nResults to save (eg: 1,2,3 1-3)") selection = "" while len(selection) == 0: selection = input("==> ") ids = [] selections = selection.split(",") for i in selections: if "-" in i: split = i.split("-") for x in range(int(split[0]), int(split[1]) + 1): ids.append(items[x - 1]["uri"]) else: ids.append(items[int(i) - 1]["uri"]) return ids @staticmethod def __print(i: int, item: dict[str, Any]) -> None: print("{:<2} {:<77}".format(i, item["name"])) class PlayableType(Enum): TRACK = "track" EPISODE = "episode" class PlayableData(NamedTuple): type: PlayableType id: PlayableId library: Path output: str class App: __playable_list: list[PlayableData] def __init__( self, config: Config, session: Session, ): self.__config = config self.__session = session self.__playable_list = [] def __parse_album(self, hex_id: str) -> None: album = self.__session.api().get_metadata_4_album(AlbumId.from_hex(hex_id)) for disc in album.disc: for track in disc.track: self.__playable_list.append( PlayableData( PlayableType.TRACK, TrackId.from_hex(bytes_to_hex(track.gid)), self.__config.music_library, self.__config.output_album, ) ) def __parse_artist(self, hex_id: str) -> None: artist = self.__session.api().get_metadata_4_artist(ArtistId.from_hex(hex_id)) for album in artist.album_group + artist.single_group: album = self.__session.api().get_metadata_4_album( AlbumId.from_hex(album.gid) ) for disc in album.disc: for track in disc.track: self.__playable_list.append( PlayableData( PlayableType.TRACK, TrackId.from_hex(bytes_to_hex(track.gid)), self.__config.music_library, self.__config.output_album, ) ) def __parse_playlist(self, b62_id: str) -> None: playlist = self.__session.api().get_playlist(PlaylistId(b62_id)) for item in playlist.contents.items: split = item.uri.split(":") playable_type = PlayableType(split[1]) id_map = {PlayableType.TRACK: TrackId, PlayableType.EPISODE: EpisodeId} playable_id = id_map[playable_type].from_base62(split[2]) self.__playable_list.append( PlayableData( playable_type, playable_id, self.__config.playlist_library, self.__config.get(f"output_playlist_{playable_type.value}"), ) ) def __parse_show(self, hex_id: str) -> None: show = self.__session.api().get_metadata_4_show(ShowId.from_hex(hex_id)) for episode in show.episode: self.__playable_list.append( PlayableData( PlayableType.EPISODE, EpisodeId.from_hex(bytes_to_hex(episode.gid)), self.__config.podcast_library, self.__config.output_podcast, ) ) def __parse_track(self, hex_id: str) -> None: self.__playable_list.append( PlayableData( PlayableType.TRACK, TrackId.from_hex(hex_id), self.__config.music_library, self.__config.output_album, ) ) def __parse_episode(self, hex_id: str) -> None: self.__playable_list.append( PlayableData( PlayableType.EPISODE, EpisodeId.from_hex(hex_id), self.__config.podcast_library, self.__config.output_podcast, ) ) def parse(self, links: list[str]) -> None: """ Parses list of selected tracks/playlists/shows/etc... Args: links: List of links """ for link in links: link = link.rsplit("?", 1)[0] try: split = link.split(link[-23]) _id = split[-1] id_type = split[-2] except IndexError: raise IndexError(f'Parsing Error: Could not parse "{link}"') if id_type == "album": self.__parse_album(b62_to_hex(_id)) elif id_type == "artist": self.__parse_artist(b62_to_hex(_id)) elif id_type == "playlist": self.__parse_playlist(_id) elif id_type == "show": self.__parse_show(b62_to_hex(_id)) elif id_type == "track": self.__parse_track(b62_to_hex(_id)) elif id_type == "episode": self.__parse_episode(b62_to_hex(_id)) else: raise TypeError(f'Parsing Error: Unknown type "{id_type}"') def get_playable_list(self) -> list[PlayableData]: """Returns list of Playable items""" return self.__playable_list def download(self) -> None: """Downloads playable to local file""" for playable in self.__playable_list: if playable.type == PlayableType.TRACK: with Loader("Fetching track..."): track = self.__session.get_track( playable.id, self.__config.download_quality ) elif playable.type == PlayableType.EPISODE: with Loader("Fetching episode..."): track = self.__session.get_episode(playable.id) else: Printer.print( PrintChannel.SKIPS, f'Download Error: Unknown playable content "{playable.type}"', ) continue try: output = track.create_output(playable.library, playable.output) except FileExistsError as e: Printer.print(PrintChannel.SKIPS, str(e)) continue file = track.write_audio_stream( output, self.__config.chunk_size, ) if self.__config.save_lyrics and isinstance(track, Track): with Loader("Fetching lyrics..."): try: track.get_lyrics().save(output) except FileNotFoundError as e: Printer.print(PrintChannel.SKIPS, str(e)) Printer.print(PrintChannel.DOWNLOADS, f"\nDownloaded {track.name}") if self.__config.audio_format != AudioFormat.VORBIS: try: with Loader(PrintChannel.PROGRESS, "Converting audio..."): file.transcode( self.__config.audio_format, self.__config.transcode_bitrate if self.__config.transcode_bitrate > 0 else None, True, self.__config.ffmpeg_path if self.__config.ffmpeg_path != "" else "ffmpeg", self.__config.ffmpeg_args.split(), ) except TranscodingError as e: Printer.print(PrintChannel.ERRORS, str(e)) if self.__config.save_metadata: with Loader("Writing metadata..."): file.write_metadata(track.metadata) file.write_cover_art( track.get_cover_art(self.__config.artwork_size) )