Better search
This commit is contained in:
parent
8d8d173a78
commit
2908dadc5b
6 changed files with 161 additions and 100 deletions
|
@ -21,7 +21,7 @@ Available on [zotify.xyz](https://zotify.xyz/zotify/zotify) and [GitHub](https:/
|
|||
*Non-premium accounts are limited to 160kbps
|
||||
|
||||
## Installation
|
||||
Requires Python 3.9 or greater. \
|
||||
Requires Python 3.10 or greater. \
|
||||
Optionally requires FFmpeg to save tracks as anything other than Ogg Vorbis.
|
||||
|
||||
Enter the following command in terminal to install Zotify. \
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
librespot@https://github.com/kokarare1212/librespot-python/archive/refs/heads/main.zip
|
||||
librespot>=0.0.9
|
||||
music-tag
|
||||
mutagen
|
||||
Pillow
|
||||
|
|
|
@ -17,9 +17,9 @@ classifiers =
|
|||
|
||||
[options]
|
||||
packages = zotify
|
||||
python_requires = >=3.9
|
||||
python_requires = >=3.10
|
||||
install_requires =
|
||||
librespot@https://github.com/kokarare1212/librespot-python/archive/refs/heads/main.zip
|
||||
librespot>=0.0.9
|
||||
music-tag
|
||||
mutagen
|
||||
Pillow
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
from argparse import ArgumentParser
|
||||
from pathlib import Path
|
||||
|
||||
from zotify.app import client
|
||||
from zotify.app import App
|
||||
from zotify.config import CONFIG_PATHS, CONFIG_VALUES
|
||||
from zotify.utils import OptionalOrFalse
|
||||
|
||||
|
@ -118,7 +118,7 @@ def main():
|
|||
help=v["help"],
|
||||
)
|
||||
|
||||
parser.set_defaults(func=client)
|
||||
parser.set_defaults(func=App)
|
||||
args = parser.parse_args()
|
||||
if args.version:
|
||||
print(VERSION)
|
||||
|
|
243
zotify/app.py
243
zotify/app.py
|
@ -23,46 +23,20 @@ from zotify.printer import Printer, PrintChannel
|
|||
from zotify.utils import API_URL, AudioFormat, b62_to_hex
|
||||
|
||||
|
||||
def client(args: Namespace) -> None:
|
||||
config = Config(args)
|
||||
Printer(config)
|
||||
with Loader("Logging in..."):
|
||||
if config.credentials is False:
|
||||
session = Session()
|
||||
else:
|
||||
session = Session(
|
||||
cred_file=config.credentials, save=True, language=config.language
|
||||
)
|
||||
selection = Selection(session)
|
||||
class ParsingError(RuntimeError):
|
||||
...
|
||||
|
||||
try:
|
||||
if args.search:
|
||||
ids = selection.search(args.search, args.category)
|
||||
elif args.playlist:
|
||||
ids = selection.get("playlists", "items")
|
||||
elif args.followed:
|
||||
ids = selection.get("following?type=artist", "artists")
|
||||
elif args.liked_tracks:
|
||||
ids = selection.get("tracks", "items")
|
||||
elif args.liked_episodes:
|
||||
ids = selection.get("episodes", "items")
|
||||
elif args.download:
|
||||
ids = []
|
||||
for x in args.download:
|
||||
ids.extend(selection.from_file(x))
|
||||
elif args.urls:
|
||||
ids = args.urls
|
||||
except (FileNotFoundError, ValueError):
|
||||
Printer.print(PrintChannel.WARNINGS, "there is nothing to do")
|
||||
return
|
||||
|
||||
app = App(config, session)
|
||||
with Loader("Parsing input..."):
|
||||
try:
|
||||
app.parse(ids)
|
||||
except (IndexError, TypeError) as e:
|
||||
Printer.print(PrintChannel.ERRORS, str(e))
|
||||
app.download()
|
||||
class PlayableType(Enum):
|
||||
TRACK = "track"
|
||||
EPISODE = "episode"
|
||||
|
||||
|
||||
class PlayableData(NamedTuple):
|
||||
type: PlayableType
|
||||
id: PlayableId
|
||||
library: Path
|
||||
output: str
|
||||
|
||||
|
||||
class Selection:
|
||||
|
@ -119,7 +93,7 @@ class Selection:
|
|||
|
||||
@staticmethod
|
||||
def __get_selection(items: list[dict[str, Any]]) -> list[str]:
|
||||
print("\nResults to save (eg: 1,2,3 1-3)")
|
||||
print("\nResults to save (eg: 1,2,5 1-3)")
|
||||
selection = ""
|
||||
while len(selection) == 0:
|
||||
selection = input("==> ")
|
||||
|
@ -134,34 +108,155 @@ class Selection:
|
|||
ids.append(items[int(i) - 1]["uri"])
|
||||
return ids
|
||||
|
||||
def __print(self, i: int, item: dict[str, Any]) -> None:
|
||||
match item["type"]:
|
||||
case "album":
|
||||
self.__print_album(i, item)
|
||||
case "playlist":
|
||||
self.__print_playlist(i, item)
|
||||
case "track":
|
||||
self.__print_track(i, item)
|
||||
case "show":
|
||||
self.__print_show(i, item)
|
||||
case _:
|
||||
print(
|
||||
"{:<2} {:<77}".format(i, self.__fix_string_length(item["name"], 77))
|
||||
)
|
||||
|
||||
def __print_album(self, i: int, item: dict[str, Any]) -> None:
|
||||
artists = ", ".join([artist["name"] for artist in item["artists"]])
|
||||
print(
|
||||
"{:<2} {:<38} {:<38}".format(
|
||||
i,
|
||||
self.__fix_string_length(item["name"], 38),
|
||||
self.__fix_string_length(artists, 38),
|
||||
)
|
||||
)
|
||||
|
||||
def __print_playlist(self, i: int, item: dict[str, Any]) -> None:
|
||||
print(
|
||||
"{:<2} {:<38} {:<38}".format(
|
||||
i,
|
||||
self.__fix_string_length(item["name"], 38),
|
||||
self.__fix_string_length(item["owner"]["display_name"], 38),
|
||||
)
|
||||
)
|
||||
|
||||
def __print_track(self, i: int, item: dict[str, Any]) -> None:
|
||||
artists = ", ".join([artist["name"] for artist in item["artists"]])
|
||||
print(
|
||||
"{:<2} {:<38} {:<38} {:<38}".format(
|
||||
i,
|
||||
self.__fix_string_length(item["name"], 38),
|
||||
self.__fix_string_length(artists, 38),
|
||||
self.__fix_string_length(item["album"]["name"], 38),
|
||||
)
|
||||
)
|
||||
|
||||
def __print_show(self, i: int, item: dict[str, Any]) -> None:
|
||||
print(
|
||||
"{:<2} {:<38} {:<38}".format(
|
||||
i,
|
||||
self.__fix_string_length(item["name"], 38),
|
||||
self.__fix_string_length(item["publisher"], 38),
|
||||
)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def __print(i: int, item: dict[str, Any]) -> None:
|
||||
print("{:<2} {:<77}".format(i, item["name"]))
|
||||
|
||||
|
||||
class PlayableType(Enum):
|
||||
TRACK = "track"
|
||||
EPISODE = "episode"
|
||||
|
||||
|
||||
class PlayableData(NamedTuple):
|
||||
type: PlayableType
|
||||
id: PlayableId
|
||||
library: Path
|
||||
output: str
|
||||
def __fix_string_length(text: str, max_length: int) -> str:
|
||||
if len(text) > max_length:
|
||||
return text[: max_length - 3] + "..."
|
||||
return text
|
||||
|
||||
|
||||
class App:
|
||||
__playable_list: list[PlayableData]
|
||||
__config: Config
|
||||
__session: Session
|
||||
__playable_list: list[PlayableData] = []
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config: Config,
|
||||
session: Session,
|
||||
args: Namespace,
|
||||
):
|
||||
self.__config = config
|
||||
self.__session = session
|
||||
self.__playable_list = []
|
||||
self.__config = Config(args)
|
||||
Printer(self.__config)
|
||||
|
||||
if self.__config.audio_format == AudioFormat.VORBIS and (self.__config.ffmpeg_args != "" or self.__config.ffmpeg_path != ""):
|
||||
Printer.print(PrintChannel.WARNINGS, "FFmpeg options will be ignored since no transcoding is required")
|
||||
|
||||
with Loader("Logging in..."):
|
||||
if self.__config.credentials is False:
|
||||
self.__session = Session()
|
||||
else:
|
||||
self.__session = Session(
|
||||
cred_file=self.__config.credentials,
|
||||
save=True,
|
||||
language=self.__config.language,
|
||||
)
|
||||
|
||||
ids = self.get_selection(args)
|
||||
with Loader("Parsing input..."):
|
||||
try:
|
||||
self.parse(ids)
|
||||
except (IndexError, TypeError) as e:
|
||||
Printer.print(PrintChannel.ERRORS, str(e))
|
||||
self.download()
|
||||
|
||||
def get_selection(self, args: Namespace) -> list[str]:
|
||||
selection = Selection(self.__session)
|
||||
try:
|
||||
if args.search:
|
||||
return selection.search(args.search, args.category)
|
||||
elif args.playlist:
|
||||
return selection.get("playlists", "items")
|
||||
elif args.followed:
|
||||
return selection.get("following?type=artist", "artists")
|
||||
elif args.liked_tracks:
|
||||
return selection.get("tracks", "items")
|
||||
elif args.liked_episodes:
|
||||
return selection.get("episodes", "items")
|
||||
elif args.download:
|
||||
ids = []
|
||||
for x in args.download:
|
||||
ids.extend(selection.from_file(x))
|
||||
return ids
|
||||
elif args.urls:
|
||||
return args.urls
|
||||
except (FileNotFoundError, ValueError):
|
||||
pass
|
||||
Printer.print(PrintChannel.WARNINGS, "there is nothing to do")
|
||||
exit()
|
||||
|
||||
def parse(self, links: list[str]) -> None:
|
||||
"""
|
||||
Parses list of selected tracks/playlists/shows/etc...
|
||||
Args:
|
||||
links: List of links
|
||||
"""
|
||||
for link in links:
|
||||
link = link.rsplit("?", 1)[0]
|
||||
try:
|
||||
split = link.split(link[-23])
|
||||
_id = split[-1]
|
||||
id_type = split[-2]
|
||||
except IndexError:
|
||||
raise ParsingError(f'Could not parse "{link}"')
|
||||
|
||||
match id_type:
|
||||
case "album":
|
||||
self.__parse_album(b62_to_hex(_id))
|
||||
case "artist":
|
||||
self.__parse_artist(b62_to_hex(_id))
|
||||
case "show":
|
||||
self.__parse_show(b62_to_hex(_id))
|
||||
case "track":
|
||||
self.__parse_track(b62_to_hex(_id))
|
||||
case "episode":
|
||||
self.__parse_episode(b62_to_hex(_id))
|
||||
case "playlist":
|
||||
self.__parse_playlist(_id)
|
||||
case _:
|
||||
raise ParsingError(f'Unknown content type "{id_type}"')
|
||||
|
||||
def __parse_album(self, hex_id: str) -> None:
|
||||
album = self.__session.api().get_metadata_4_album(AlbumId.from_hex(hex_id))
|
||||
|
@ -241,36 +336,6 @@ class App:
|
|||
)
|
||||
)
|
||||
|
||||
def parse(self, links: list[str]) -> None:
|
||||
"""
|
||||
Parses list of selected tracks/playlists/shows/etc...
|
||||
Args:
|
||||
links: List of links
|
||||
"""
|
||||
for link in links:
|
||||
link = link.rsplit("?", 1)[0]
|
||||
try:
|
||||
split = link.split(link[-23])
|
||||
_id = split[-1]
|
||||
id_type = split[-2]
|
||||
except IndexError:
|
||||
raise IndexError(f'Parsing Error: Could not parse "{link}"')
|
||||
|
||||
if id_type == "album":
|
||||
self.__parse_album(b62_to_hex(_id))
|
||||
elif id_type == "artist":
|
||||
self.__parse_artist(b62_to_hex(_id))
|
||||
elif id_type == "playlist":
|
||||
self.__parse_playlist(_id)
|
||||
elif id_type == "show":
|
||||
self.__parse_show(b62_to_hex(_id))
|
||||
elif id_type == "track":
|
||||
self.__parse_track(b62_to_hex(_id))
|
||||
elif id_type == "episode":
|
||||
self.__parse_episode(b62_to_hex(_id))
|
||||
else:
|
||||
raise TypeError(f'Parsing Error: Unknown type "{id_type}"')
|
||||
|
||||
def get_playable_list(self) -> list[PlayableData]:
|
||||
"""Returns list of Playable items"""
|
||||
return self.__playable_list
|
||||
|
|
|
@ -210,10 +210,6 @@ class Episode(PlayableContentFeeder.LoadedStream, Playable):
|
|||
"title": self.name,
|
||||
}
|
||||
|
||||
def can_download_direct(self) -> bool:
|
||||
"""Returns true if episode can be downloaded from its original external source"""
|
||||
return bool(self.external_url)
|
||||
|
||||
def write_audio_stream(
|
||||
self, output: Path, chunk_size: int = 128 * 1024
|
||||
) -> LocalFile:
|
||||
|
@ -225,7 +221,7 @@ class Episode(PlayableContentFeeder.LoadedStream, Playable):
|
|||
Returns:
|
||||
LocalFile object
|
||||
"""
|
||||
if not self.can_download_direct():
|
||||
if bool(self.external_url):
|
||||
return super().write_audio_stream(output, chunk_size)
|
||||
file = f"{output}.{self.external_url.rsplit('.', 1)[-1]}"
|
||||
with get(self.external_url, stream=True) as r, open(
|
||||
|
|
Loading…
Add table
Reference in a new issue