From 612dbb6993c8a548744f29a4d15d6b31325357c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Romain=20Gon=C3=A7alves?= Date: Tue, 14 May 2024 18:53:03 +0200 Subject: Tue May 14 06:53:03 PM CEST 2024 --- .bin/music | 175 +++++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 123 insertions(+), 52 deletions(-) (limited to '.bin/music') diff --git a/.bin/music b/.bin/music index a14aae9..0c280ad 100755 --- a/.bin/music +++ b/.bin/music @@ -1,63 +1,37 @@ #!/usr/bin/python3 +import logging import os import sys -import yt_dlp -from dataclasses import dataclass - - -def _match_filter(info: dict, *, incomplete) -> str | None: - _duration = info.get("duration") - _duration_min = 60 - - if _duration and int(_duration) < _duration_min: - return "Duration too short: < _duration_min" - return None +import yt_dlp # type: ignore[import] +from dataclasses import dataclass -@dataclass(frozen=True) -class Collection: - """A music collection.""" - - title: str - links: frozenset[str] - - def __eq__(self, other) -> bool: - if isinstance(other, Collection): - return self.title == other.title - raise NotImplementedError - +logging.basicConfig() +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) -def parse_raw_to_collections(raw_data: list[str]) -> frozenset[Collection]: - collections: set[Collection] = set() - _collection_data: list[str] = [] - for index, line in enumerate(raw_data): - if line.startswith("#"): - continue - elif line == "" or index + 1 == len(raw_data): - if len(_collection_data) == 0: - continue +def get_ytdlp_options(output_dir: str) -> dict: + """yt_dlp download and convertion options.""" - collections.add( - Collection(_collection_data[0], frozenset(_collection_data[1:])) - ) - _collection_data.clear() - else: - _collection_data.append(line) + def match_filter(info: dict, *, incomplete) -> str | None: + duration = info.get("duration") + duration_min = 60 - return frozenset(collections) + if duration is not None and int(duration) < duration_min: + return "Duration too short: < _duration_min" + return None -def get_ytdlp_options(output_dir: str) -> dict: return { "format": "bestaudio/best", - "match_filter": _match_filter, + "match_filter": match_filter, "postprocessors": [ { "key": "FFmpegExtractAudio", - #"preferredcodec": "m4a", + # "preferredcodec": "m4a", }, { "key": "FFmpegMetadata", @@ -71,31 +45,128 @@ def get_ytdlp_options(output_dir: str) -> dict: "outtmpl": f"{output_dir}/%(title)s.%(ext)s", "restrictfilenames": True, "ignoreerrors": True, + "writethumbnail": True, } -def download_collection(collection: Collection, parent_dir: str) -> None: - output_dir = os.path.join(parent_dir, collection.title) +def parse_raw_lines(lines: list[str]) -> list[list[str]]: + """Parse collections of name + link(s) + + (Usually stored in a text file). + """ + entries: list[list[str]] = list() + entry: list[str] = list() + + for index, line in enumerate(lines): + + # entries are separated by an empty line. + if line == "": + entries.append(entry) + entry = list() + continue + + entry.append(line) - if os.path.isdir(output_dir): - return + # handle the last entry when reaching the end of the file. + if index + 1 == len(lines): + entries.append(entry) + entry = list() - os.makedirs(output_dir, exist_ok=True) + return entries - with yt_dlp.YoutubeDL(get_ytdlp_options(output_dir)) as downloader: - downloader.download(collection.links) + +@dataclass(frozen=True) +class Link: + """A music link.""" + + url: str + is_enabled: bool + + +@dataclass(frozen=True) +class Collection: + """A music collection.""" + + name: str + links: tuple[Link, ...] + is_enabled: bool + + def __eq__(self, other) -> bool: + if isinstance(other, Collection): + return self.name == other.name + + raise NotImplementedError + + +def sanitize_entry_informations( + entry: str, indicator: str = "#" +) -> tuple[str, bool]: + + is_comment = entry.startswith(indicator) + + if is_comment: + entry = entry.split(indicator, 1)[1].lstrip() + + return entry, not is_comment + + +def create_link(entry: str) -> Link: + url, is_enabled = sanitize_entry_informations(entry) + return Link(url=url, is_enabled=is_enabled) + + +def create_collection(entry: list[str]) -> Collection: + """Create a collection from a raw entry.""" + name, is_enabled = sanitize_entry_informations(entry[0]) + links = [create_link(_link) for _link in entry[1:]] + + return Collection( + name=name, + links=tuple(links), + is_enabled=is_enabled + ) + + +def get_collection_dir(collection: Collection, parent_dir: str) -> str: + return os.path.join(parent_dir, collection.name) + + +def download_collection(collection: Collection, directory: str) -> None: + """Download a music collection to the local filesystem.""" + + # create directory and download/convert with opinionated settings. + os.makedirs(directory, exist_ok=True) + + with yt_dlp.YoutubeDL(get_ytdlp_options(directory)) as downloader: + for link in collection.links: + if not link.is_enabled: + logger.info(f"Skipping {collection.name}, {link}") + continue + + logger.info(f"Downloading {collection.name}, {link}") + downloader.download(link.url) def main() -> int: - # input handling + """Main entrypoint.""" + + # argument handling if len(sys.argv) != 2: return 1 with open(sys.argv[1], "r") as file: filedata = file.read().splitlines() - for collection in parse_raw_to_collections(filedata): - download_collection(collection, os.getcwd()) + for entry in parse_raw_lines(filedata): + collection = create_collection(entry) + output_dir = get_collection_dir(collection, os.getcwd()) + + if os.path.isdir(output_dir) or not collection.is_enabled: + logger.info(f"Skipping {collection.name}") + continue + + logger.info(f"Handling {collection.name}") + download_collection(collection, output_dir) return 0 -- cgit v1.2.3