Added --skip-duplicate implementation

This commit is contained in:
DraftKinner 2025-02-09 16:41:14 -05:00
parent 802b5b5928
commit 08c6fb911e
3 changed files with 77 additions and 26 deletions

View file

@ -147,6 +147,7 @@ class App:
def __init__(self, args: Namespace):
self.__config = Config(args)
self.__existing = {}
self.__duplicates = {}
Logger(self.__config)
# Create session
@ -181,11 +182,8 @@ class App:
Logger.log(LogChannel.ERRORS, str(e))
exit(1)
if len(collections) > 0:
self.scan(
collections,
self.__config.skip_previous,
self.__config.skip_duplicates,
)
with Loader("Scanning collections..."):
self.scan(collections)
self.download_all(collections)
else:
Logger.log(LogChannel.WARNINGS, "there is nothing to do")
@ -246,20 +244,20 @@ class App:
raise ParseError(f'Unsupported content type "{id_type}"')
return collections
def scan(
self,
collections: list[Collection],
skip_previous: bool,
skip_duplicate: bool,
):
if skip_previous:
def scan(self, collections: list[Collection]):
if self.__config.skip_previous:
for collection in collections:
existing = collection.get_existing(
self.__config.audio_format.value.ext
)
existing = collection.get_existing(self.__config.audio_format.value.ext)
self.__existing.update(existing)
if skip_duplicate:
pass
if self.__config.skip_duplicates:
for collection in collections:
duplicates = collection.get_duplicates(
self.__config.audio_format.value.ext,
self.__config.album_library,
self.__config.playlist_library,
self.__config.podcast_library,
)
self.__duplicates.update(duplicates)
def download_all(self, collections: list[Collection]) -> None:
count = 0
@ -268,6 +266,13 @@ class App:
for playable in collection.playables:
count += 1
# Skip duplicates and previously downloaded
if playable.duplicate:
Logger.log(
LogChannel.SKIPS,
f'Skipping "{self.__duplicates[playable.id]}": Duplicated from another collection',
)
continue
if playable.existing:
Logger.log(
LogChannel.SKIPS,
@ -285,7 +290,7 @@ class App:
except RuntimeError as err:
Logger.log(
LogChannel.SKIPS,
f'Skipping track id = {playable.id}: {err}',
f"Skipping track id = {playable.id}: {err}",
)
continue
elif playable.type == PlayableType.EPISODE:
@ -319,7 +324,9 @@ class App:
desc=f"({count}/{total}) {track.name}",
total=track.input_stream.size,
) as p_bar:
file = track.write_audio_stream(output, p_bar, self.__config.download_real_time)
file = track.write_audio_stream(
output, p_bar, self.__config.download_real_time
)
# Download lyrics
if playable.type == PlayableType.TRACK and self.__config.lyrics_file:

View file

@ -23,10 +23,9 @@ from zotify.utils import (
class Collection:
def __init__(self):
self.playables: list[PlayableData] = []
self.path: Path = None
def get_existing(self, ext: str) -> dict[str, str]:
existing: dict[str, str] = {}
def set_path(self):
meta_tags = ["album_artist", "album", "podcast", "playlist"]
library = Path(self.playables[0].library)
output = self.playables[0].output_template
@ -38,10 +37,16 @@ class Collection:
"{" + meta.name + "}", fix_filename(meta.string)
)
collection_path = library.joinpath(output).expanduser()
if collection_path.parent.exists():
self.path = library.joinpath(output).expanduser().parent
def get_existing(self, ext: str) -> dict[str, str]:
existing: dict[str, str] = {}
if self.path is None:
self.set_path()
if self.path.exists():
file_path = "*.{}".format(ext)
scan_path = str(collection_path.parent.joinpath(file_path))
scan_path = str(self.path.joinpath(file_path))
# Check contents of path
for file in iglob(scan_path):
@ -55,6 +60,44 @@ class Collection:
return existing
def get_duplicates(
self, ext: str, album_lib: Path, playlist_lib: Path, podcast_lib: Path
) -> dict[str, str]:
existing: dict[str, str] = {}
duplicates: dict[str, str] = {}
scan_paths = []
if self.path is None:
self.set_path()
if self.path.exists():
file_path = "*.{}".format(ext)
collection_path = str(self.path.joinpath(file_path))
file_path = "**/*.{}".format(ext)
# Scan album library path
scan_paths.append(str(album_lib.joinpath(file_path)))
# Scan playlist library path
scan_paths.append(str(playlist_lib.joinpath(file_path)))
# Scan podcast library path
scan_paths.append(str(podcast_lib.joinpath(file_path)))
for scan_path in scan_paths:
for file in iglob(scan_path, recursive=True):
f_path = Path(file)
if self.path.exists() and f_path.match(collection_path):
continue
f = LocalFile(f_path)
existing[f.get_metadata("key")] = f_path.stem
for playable in self.playables:
if playable.id in existing.keys():
playable.duplicate = True
duplicates[playable.id] = existing[playable.id]
existing = {}
return duplicates
class Album(Collection):
def __init__(self, b62_id: str, api: ApiClient, config: Config = Config()):

View file

@ -112,13 +112,14 @@ class PlayableType(Enum):
@dataclass
class PlayableData():
class PlayableData:
type: PlayableType
id: str
library: Path
output_template: str
metadata: list[MetadataEntry] = field(default_factory=list)
existing: bool = False
duplicate: bool = False
class OptionalOrFalse(Action):