Added --skip-previous implementation

This commit is contained in:
DraftKinner 2025-02-07 18:19:00 -05:00
parent e829b39683
commit 9d3441ffd7
5 changed files with 96 additions and 12 deletions

View file

@ -146,6 +146,7 @@ class Selection:
class App:
def __init__(self, args: Namespace):
self.__config = Config(args)
self.__existing = {}
Logger(self.__config)
# Create session
@ -180,6 +181,11 @@ class App:
Logger.log(LogChannel.ERRORS, str(e))
exit(1)
if len(collections) > 0:
self.scan(
collections,
self.__config.skip_previous,
self.__config.skip_duplicates,
)
self.download_all(collections)
else:
Logger.log(LogChannel.WARNINGS, "there is nothing to do")
@ -240,6 +246,21 @@ class App:
raise ParseError(f'Unsupported content type "{id_type}"')
return collections
def scan(
self,
collections: list[Collection],
skip_previous: bool,
skip_duplicate: bool,
):
if skip_previous:
for collection in collections:
existing = collection.get_existing(
self.__config.audio_format.value.ext
)
self.__existing.update(existing)
if skip_duplicate:
pass
def download_all(self, collections: list[Collection]) -> None:
count = 0
total = sum(len(c.playables) for c in collections)
@ -247,6 +268,13 @@ class App:
for playable in collection.playables:
count += 1
if playable.existing:
Logger.log(
LogChannel.SKIPS,
f'Skipping "{self.__existing[playable.id]}": Previously downloaded',
)
continue
# Get track data
if playable.type == PlayableType.TRACK:
with Loader("Fetching track..."):
@ -257,7 +285,7 @@ class App:
except RuntimeError as err:
Logger.log(
LogChannel.SKIPS,
f'Skipping song id = {playable.id}: {err}',
f'Skipping track id = {playable.id}: {err}',
)
continue
elif playable.type == PlayableType.EPISODE:

View file

@ -1,3 +1,6 @@
from pathlib import Path
from glob import iglob
from librespot.metadata import (
AlbumId,
ArtistId,
@ -7,17 +10,55 @@ from librespot.metadata import (
from zotify import ApiClient
from zotify.config import Config
from zotify.utils import MetadataEntry, PlayableData, PlayableType, bytes_to_base62
from zotify.file import LocalFile
from zotify.utils import (
MetadataEntry,
PlayableData,
PlayableType,
bytes_to_base62,
fix_filename,
)
class Collection:
def __init__(self, b62_id: str, api: ApiClient, config: Config = Config()):
def __init__(self):
self.playables: list[PlayableData] = []
def get_existing(self, ext: str) -> dict[str, str]:
existing: dict[str, str] = {}
meta_tags = ["album_artist", "album", "podcast", "playlist"]
library = Path(self.playables[0].library)
output = self.playables[0].output_template
metadata = self.playables[0].metadata
id_type = self.playables[0].type
for meta in metadata:
if meta.name in meta_tags:
output = output.replace(
"{" + meta.name + "}", fix_filename(meta.string)
)
collection_path = library.joinpath(output).expanduser()
if collection_path.parent.exists():
file_path = "*.{}".format(ext)
scan_path = str(collection_path.parent.joinpath(file_path))
# Check contents of path
for file in iglob(scan_path):
f_path = Path(file)
f = LocalFile(f_path)
existing[f.get_metadata("key")] = f_path.stem
for playable in self.playables:
if playable.id in existing.keys():
playable.existing = True
return existing
class Album(Collection):
def __init__(self, b62_id: str, api: ApiClient, config: Config = Config()):
super().__init__(b62_id, api, config)
super().__init__()
album = api.get_metadata_4_album(AlbumId.from_base62(b62_id))
for disc in album.disc:
for track in disc.track:
@ -35,7 +76,7 @@ class Album(Collection):
class Artist(Collection):
def __init__(self, b62_id: str, api: ApiClient, config: Config = Config()):
super().__init__(b62_id, api, config)
super().__init__()
artist = api.get_metadata_4_artist(ArtistId.from_base62(b62_id))
for album_group in (
artist.album_group
@ -60,7 +101,7 @@ class Artist(Collection):
class Show(Collection):
def __init__(self, b62_id: str, api: ApiClient, config: Config = Config()):
super().__init__(b62_id, api, config)
super().__init__()
show = api.get_metadata_4_show(ShowId.from_base62(b62_id))
for episode in show.episode:
metadata = [MetadataEntry("key", bytes_to_base62(episode.gid))]
@ -77,7 +118,7 @@ class Show(Collection):
class Playlist(Collection):
def __init__(self, b62_id: str, api: ApiClient, config: Config = Config()):
super().__init__(b62_id, api, config)
super().__init__()
playlist = api.get_playlist(PlaylistId(b62_id))
for i in range(len(playlist.contents.items)):
item = playlist.contents.items[i]
@ -124,7 +165,7 @@ class Playlist(Collection):
class Track(Collection):
def __init__(self, b62_id: str, api: ApiClient, config: Config = Config()):
super().__init__(b62_id, api, config)
super().__init__()
metadata = [MetadataEntry("key", b62_id)]
self.playables.append(
PlayableData(
@ -139,7 +180,7 @@ class Track(Collection):
class Episode(Collection):
def __init__(self, b62_id: str, api: ApiClient, config: Config = Config()):
super().__init__(b62_id, api, config)
super().__init__()
metadata = [MetadataEntry("key", b62_id)]
self.playables.append(
PlayableData(

View file

@ -117,3 +117,13 @@ class LocalFile:
f.save()
except OggVorbisHeaderError:
pass # Thrown when using untranscoded file, nothing breaks.
def get_metadata(self, tag: str) -> str:
"""
Gets metadata from file
Args:
tag: metadata tag to be retrieved
"""
f = load_file(self.__path)
return f[tag].value

View file

@ -89,6 +89,8 @@ class Playable:
file_path = library.joinpath(output).expanduser()
file_path = Path(f'{file_path}.{ext}')
if file_path.exists() and not replace:
f = LocalFile(file_path)
f.write_metadata(self.metadata)
raise FileExistsError("File already downloaded")
else:
file_path.parent.mkdir(parents=True, exist_ok=True)

View file

@ -3,6 +3,7 @@ from enum import Enum, IntEnum
from pathlib import Path
from re import IGNORECASE, sub
from typing import Any, NamedTuple
from dataclasses import dataclass, field
from librespot.audio.decoders import AudioQuality
from librespot.util import Base62
@ -110,12 +111,14 @@ class PlayableType(Enum):
EPISODE = "episode"
class PlayableData(NamedTuple):
@dataclass
class PlayableData():
type: PlayableType
id: str
library: Path
output_template: str
metadata: list[MetadataEntry] = []
metadata: list[MetadataEntry] = field(default_factory=list)
existing: bool = False
class OptionalOrFalse(Action):