diff options
Diffstat (limited to '.bin/music')
| -rwxr-xr-x | .bin/music | 175 | 
1 files changed, 123 insertions, 52 deletions
| @@ -1,63 +1,37 @@  #!/usr/bin/python3 +import logging  import os  import sys -import yt_dlp -from dataclasses import dataclass - - -def _match_filter(info: dict, *, incomplete) -> str | None: -    _duration = info.get("duration") -    _duration_min = 60 - -    if _duration and int(_duration) < _duration_min: -        return "Duration too short: < _duration_min" -    return None +import yt_dlp  # type: ignore[import] +from dataclasses import dataclass -@dataclass(frozen=True) -class Collection: -    """A music collection.""" - -    title: str -    links: frozenset[str] - -    def __eq__(self, other) -> bool: -        if isinstance(other, Collection): -            return self.title == other.title -        raise NotImplementedError - +logging.basicConfig() +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) -def parse_raw_to_collections(raw_data: list[str]) -> frozenset[Collection]: -    collections: set[Collection] = set() -    _collection_data: list[str] = [] -    for index, line in enumerate(raw_data): -        if line.startswith("#"): -            continue -        elif line == "" or index + 1 == len(raw_data): -            if len(_collection_data) == 0: -                continue +def get_ytdlp_options(output_dir: str) -> dict: +    """yt_dlp download and convertion options.""" -            collections.add( -                Collection(_collection_data[0], frozenset(_collection_data[1:])) -            ) -            _collection_data.clear() -        else: -            _collection_data.append(line) +    def match_filter(info: dict, *, incomplete) -> str | None: +        duration = info.get("duration") +        duration_min = 60 -    return frozenset(collections) +        if duration is not None and int(duration) < duration_min: +            return "Duration too short: < _duration_min" +        return None -def get_ytdlp_options(output_dir: str) -> dict:      return {          "format": "bestaudio/best", -        "match_filter": _match_filter, +        "match_filter": match_filter,          "postprocessors": [              {                  "key": "FFmpegExtractAudio", -                #"preferredcodec": "m4a", +                # "preferredcodec": "m4a",              },              {                  "key": "FFmpegMetadata", @@ -71,31 +45,128 @@ def get_ytdlp_options(output_dir: str) -> dict:          "outtmpl": f"{output_dir}/%(title)s.%(ext)s",          "restrictfilenames": True,          "ignoreerrors": True, +        "writethumbnail": True,      } -def download_collection(collection: Collection, parent_dir: str) -> None: -    output_dir = os.path.join(parent_dir, collection.title) +def parse_raw_lines(lines: list[str]) -> list[list[str]]: +    """Parse collections of name + link(s) + +    (Usually stored in a text file). +    """ +    entries: list[list[str]] = list() +    entry: list[str] = list() + +    for index, line in enumerate(lines): + +        # entries are separated by an empty line. +        if line == "": +            entries.append(entry) +            entry = list() +            continue + +        entry.append(line) -    if os.path.isdir(output_dir): -        return +        # handle the last entry when reaching the end of the file. +        if index + 1 == len(lines): +            entries.append(entry) +            entry = list() -    os.makedirs(output_dir, exist_ok=True) +    return entries -    with yt_dlp.YoutubeDL(get_ytdlp_options(output_dir)) as downloader: -        downloader.download(collection.links) + +@dataclass(frozen=True) +class Link: +    """A music link.""" + +    url: str +    is_enabled: bool + + +@dataclass(frozen=True) +class Collection: +    """A music collection.""" + +    name: str +    links: tuple[Link, ...] +    is_enabled: bool + +    def __eq__(self, other) -> bool: +        if isinstance(other, Collection): +            return self.name == other.name + +        raise NotImplementedError + + +def sanitize_entry_informations( +    entry: str, indicator: str = "#" +) -> tuple[str, bool]: + +    is_comment = entry.startswith(indicator) + +    if is_comment: +        entry = entry.split(indicator, 1)[1].lstrip() + +    return entry, not is_comment + + +def create_link(entry: str) -> Link: +    url, is_enabled = sanitize_entry_informations(entry) +    return Link(url=url, is_enabled=is_enabled) + + +def create_collection(entry: list[str]) -> Collection: +    """Create a collection from a raw entry.""" +    name, is_enabled = sanitize_entry_informations(entry[0]) +    links = [create_link(_link) for _link in entry[1:]] + +    return Collection( +        name=name, +        links=tuple(links), +        is_enabled=is_enabled +    ) + + +def get_collection_dir(collection: Collection, parent_dir: str) -> str: +    return os.path.join(parent_dir, collection.name) + + +def download_collection(collection: Collection, directory: str) -> None: +    """Download a music collection to the local filesystem.""" + +    # create directory and download/convert with opinionated settings. +    os.makedirs(directory, exist_ok=True) + +    with yt_dlp.YoutubeDL(get_ytdlp_options(directory)) as downloader: +        for link in collection.links: +            if not link.is_enabled: +                logger.info(f"Skipping {collection.name}, {link}") +                continue + +            logger.info(f"Downloading {collection.name}, {link}") +            downloader.download(link.url)  def main() -> int: -    # input handling +    """Main entrypoint.""" + +    # argument handling      if len(sys.argv) != 2:          return 1      with open(sys.argv[1], "r") as file:          filedata = file.read().splitlines() -    for collection in parse_raw_to_collections(filedata): -        download_collection(collection, os.getcwd()) +    for entry in parse_raw_lines(filedata): +        collection = create_collection(entry) +        output_dir = get_collection_dir(collection, os.getcwd()) + +        if os.path.isdir(output_dir) or not collection.is_enabled: +            logger.info(f"Skipping {collection.name}") +            continue + +        logger.info(f"Handling {collection.name}") +        download_collection(collection, output_dir)      return 0 |