Files
picard-discogs-genre/__init__.py
2026-03-13 20:27:50 -04:00

464 lines
17 KiB
Python

import re
import urllib.parse
import functools
import unicodedata
from PyQt5 import QtWidgets, QtCore
from picard import log, config
from picard.webservice import ratecontrol
from picard.metadata import register_album_metadata_processor
from picard.ui.options import register_options_page, OptionsPage
from picard.ui.itemviews import register_album_action, BaseAction
from picard.album import Album
from picard.metadata import Metadata
from .constants import *
class DiscogsGenreOptionsPage(OptionsPage):
NAME = "discogs_genre_and_style"
TITLE = "Discogs Genre & Style"
PARENT = "plugins"
options = CONFIG_OPTIONS
def __init__(self, parent=None) -> None:
super().__init__(parent)
self.setup_ui()
def setup_ui(self) -> None:
layout = QtWidgets.QVBoxLayout(self)
options_group = QtWidgets.QGroupBox("Options", self)
options_group.setSizePolicy(QtWidgets.QSizePolicy.Preferred, QtWidgets.QSizePolicy.Minimum)
options_layout = QtWidgets.QVBoxLayout(options_group)
self.token_input = QtWidgets.QLineEdit(self)
self.token_input.setPlaceholderText("Account > Settings > Developers > Generate token")
self.style_input = QtWidgets.QLineEdit(self)
self.style_input.setPlaceholderText("grouping")
self.style_input.setToolTip("Metadata tag to store Discogs style (default: grouping), set it blank to disable")
min_overlap_layout = QtWidgets.QHBoxLayout()
min_overlap_label = QtWidgets.QLabel("Minimum Token Overlap", self)
min_overlap_label.setToolTip("Minimum percentage of token overlap required to consider a Discogs search result as a match (0-100%)")
min_overlap_label.setSizePolicy(QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Fixed)
self.min_overlap_input = QtWidgets.QSpinBox(self)
self.min_overlap_input.setRange(0, 100)
self.min_overlap_input.setSuffix(" %")
self.min_overlap_input.setSizePolicy(QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Preferred)
min_overlap_layout.addWidget(min_overlap_label)
min_overlap_layout.addStretch()
min_overlap_layout.addWidget(self.min_overlap_input)
options_layout.addWidget(QtWidgets.QLabel("Personal Access Token (recommended for higher rate limits)", self))
options_layout.addWidget(self.token_input)
options_layout.addWidget(QtWidgets.QLabel("Style Tag", self))
options_layout.addWidget(self.style_input)
options_layout.addLayout(min_overlap_layout)
layout.addWidget(options_group)
layout.addStretch()
def load(self):
self.token_input.setText(config.setting["discogs_personal_access_token"] or "")
self.style_input.setText(config.setting["discogs_style_tag"] or "grouping")
self.min_overlap_input.setValue(config.setting["discogs_minimum_token_overlap"] or 80)
def save(self):
config.setting["discogs_personal_access_token"] = self.token_input.text().strip()
config.setting["discogs_style_tag"] = self.style_input.text().strip()
config.setting["discogs_minimum_token_overlap"] = self.min_overlap_input.value()
class DiscogsGenreProcessor:
def __init__(self):
self.host = "api.discogs.com"
def _add_discogs_tags(self, metadata: Metadata, genres: list[str], styles: list[str]) -> None:
metadata.delete('genre')
for genre in genres:
metadata.add('genre', genre)
style_tag = config.setting["discogs_style_tag"]
if style_tag is not None and style_tag != "":
tag_name = style_tag or "grouping"
metadata.delete(tag_name)
for style in styles:
metadata.add(tag_name, style)
def _propagate_tags_to_album_tracks(self, album: Album, genres: list[str], styles: list[str]) -> None:
for track in album.tracks:
self._add_discogs_tags(track.metadata, genres, styles)
for file in track.files:
self._add_discogs_tags(file.metadata, genres, styles)
file.update()
track.update()
def _clean_search_text(self, text: str) -> str:
if not text:
return ""
text = unicodedata.normalize("NFKC", text).strip()
text = re.sub(r"\s+", " ", text)
return text
def _normalize_text(self, text: str) -> str:
if not text:
return ""
text = unicodedata.normalize("NFKC", text).casefold()
text = unicodedata.normalize("NFKD", text)
text = "".join(ch for ch in text if unicodedata.category(ch) != "Mn")
text = "".join(ch if ch.isalnum() else " " for ch in text)
return re.sub(r"\s+", " ", text).strip()
def _collapse_search_tokens(self, text: str) -> str:
if not text:
return ""
text = unicodedata.normalize("NFKD", text)
text = "".join(ch for ch in text if unicodedata.category(ch) != "Mn")
text = "".join(ch if ch.isalnum() or ch.isspace() else "" for ch in text)
return re.sub(r"\s+", " ", text).strip()
def _artist_search_variants(self, artist: str) -> list[str]:
variants: list[str] = []
def add_variant(value: str) -> None:
cleaned = self._clean_search_text(value)
if cleaned and cleaned not in variants:
variants.append(cleaned)
add_variant(artist)
add_variant(self._collapse_search_tokens(artist))
normalized_tokens = [token for token in self._normalize_text(artist).split() if len(token) > 1]
collapsed_tokens = [token for token in self._collapse_search_tokens(artist).split() if len(token) > 1]
if collapsed_tokens:
add_variant(" ".join(collapsed_tokens))
if len(collapsed_tokens) > 1:
add_variant(max(collapsed_tokens, key=len))
if normalized_tokens:
add_variant(" ".join(normalized_tokens))
if len(normalized_tokens) > 1:
add_variant(max(normalized_tokens, key=len))
return variants
def _build_search_attempts(self, artists: list[str], title: str, token: str) -> list[dict[str, str]]:
artist_variants = self._artist_search_variants(artists[0]) if artists else []
title_variants = [self._clean_search_text(title)]
collapsed_title = self._collapse_search_tokens(title)
if collapsed_title and collapsed_title not in title_variants:
title_variants.append(collapsed_title)
attempts: list[dict[str, str]] = []
for artist_variant in artist_variants or [""]:
for title_variant in title_variants:
query_params = {
'artist': artist_variant,
'release_title': title_variant,
'type': 'master'
}
if token:
query_params['token'] = token
if query_params not in attempts:
attempts.append(query_params)
return attempts
def _token_overlap(self, left: str, right: str) -> float:
left_tokens = set(left.split())
right_tokens = set(right.split())
if not left_tokens or not right_tokens:
return 0.0
return len(left_tokens & right_tokens) / len(left_tokens)
def _apply_rate_limit(self, token):
# 60 req/min with token (1000ms), 25 req/min without (2400ms)
delay = 1000 if token else 2400
ratecontrol.set_minimum_delay((self.host, 443), delay)
def process_album(self, album: Album, metadata: Metadata, release: dict):
token = (config.setting["discogs_personal_access_token"] or "").strip()
self._apply_rate_limit(token)
discogs_url = None
for rel in release.get('relations', []):
if rel.get('type') == 'discogs':
discogs_url = rel.get('url', {}).get('resource')
break
if not discogs_url:
for rel in release.get('release-group', {}).get('relations', []):
if rel.get('type') == 'discogs':
discogs_url = rel.get('url', {}).get('resource')
break
if discogs_url:
match = re.search(r'/(release|master)/(\d+)', discogs_url)
if match:
entity_type = match.group(1)
entity_id = match.group(2)
album._requests += 1
self.fetch_discogs_tags(album, metadata, entity_type, entity_id, token)
return
rg_credits = release.get('release-group', {}).get('artist-credit', [])
credits = rg_credits or release.get('artist-credit', [])
artists = [
c.get('name') or c.get('artist', {}).get('name', '')
for c in credits
if isinstance(c, dict)
]
title = metadata.get('album')
if artists and title:
album._requests += 1
self.search_discogs(album, metadata, artists, title, token)
def search_discogs(self, album: Album, metadata: Metadata, artists: list[str], title: str, token: str):
attempts = self._build_search_attempts(artists, title, token)
if not attempts:
album._requests -= 1
if album._requests == 0:
album._finalize_loading(None)
return
self._perform_search_attempt(album, metadata, artists, title, attempts, 0)
def _perform_search_attempt(
self,
album: Album,
metadata: Metadata,
artists: list[str],
title: str,
attempts: list[dict[str, str]],
attempt_index: int,
) -> None:
query_params = attempts[attempt_index]
path = "/database/search?" + urllib.parse.urlencode(query_params)
full_url = f"https://{self.host}{path}"
album.tagger.webservice.get_url( # type: ignore
url=full_url,
parse_response_type="json",
handler=functools.partial(
self.handle_search_response,
album,
metadata,
artists,
title,
attempts,
attempt_index,
)
)
def handle_search_response(
self,
album: Album,
metadata: Metadata,
artists: list[str],
title: str,
attempts: list[dict[str, str]],
attempt_index: int,
response,
_reply,
error,
):
try:
if error or not response:
log.error(f"Discogs Search API failed: {error}")
return
results = response.get('results', [])
if not results and attempt_index + 1 < len(attempts):
next_attempt = attempts[attempt_index + 1]
log.debug(
"Discogs search returned no results for artist=%r title=%r, retrying with artist=%r title=%r",
attempts[attempt_index].get('artist', ''),
attempts[attempt_index].get('release_title', ''),
next_attempt.get('artist', ''),
next_attempt.get('release_title', ''),
)
album._requests += 1
self._perform_search_attempt(album, metadata, artists, title, attempts, attempt_index + 1)
return
valid_result = self.validate_search_results(artists, title, results)
if valid_result:
genres = valid_result.get('genre', [])
styles = valid_result.get('style', [])
self._add_discogs_tags(metadata, genres, styles)
finally:
album._requests -= 1
if album._requests == 0:
album._finalize_loading(None)
def validate_search_results(self, mb_artists: list[str], mb_title: str, results: list):
norm_mb_artists = [self._normalize_text(artist) for artist in mb_artists if artist]
norm_mb_title = self._normalize_text(mb_title)
for result in results:
raw_title = result.get('title', '') or ''
norm_full_title = self._normalize_text(raw_title)
# "Artist(s) - Release" in search results
parts = raw_title.split(' - ', 1)
norm_dc_artist = self._normalize_text(parts[0]) if len(parts) > 1 else norm_full_title
norm_dc_release = self._normalize_text(parts[1]) if len(parts) > 1 else norm_full_title
title_match = (
norm_mb_title == norm_dc_release
or self._token_overlap(norm_mb_title, norm_dc_release) >= (config.setting["discogs_minimum_token_overlap"] or 80) / 100
or self._token_overlap(norm_mb_title, norm_full_title) >= (config.setting["discogs_minimum_token_overlap"] or 80) / 100
)
artists_match = all(
(artist in norm_dc_artist)
or (artist in norm_full_title)
or (self._token_overlap(artist, norm_dc_artist) >= (config.setting["discogs_minimum_token_overlap"] or 80) / 100)
for artist in norm_mb_artists
)
if title_match and artists_match:
return result
return None
def fetch_discogs_tags(
self,
album: Album,
metadata: Metadata,
entity_type: str,
entity_id: str,
token: str,
use_album_requests: bool = True,
):
path = f"/{entity_type}s/{entity_id}"
if token:
path += f"?token={token}"
full_url = f"https://{self.host}{path}"
album.tagger.webservice.get_url( # type: ignore
url=full_url,
parse_response_type="json",
priority=True,
handler=functools.partial(self.handle_tags_response, album, metadata, use_album_requests)
)
def handle_tags_response(self, album: Album, metadata: Metadata, use_album_requests: bool, response, reply, error):
try:
if error or not response:
log.error(f"Discogs Tags API failed: {error}")
return
genres = response.get('genres', [])
styles = response.get('styles', [])
self._add_discogs_tags(metadata, genres, styles)
if not use_album_requests:
self._propagate_tags_to_album_tracks(album, genres, styles)
finally:
if use_album_requests:
if album._requests > 0:
album._requests -= 1
else:
log.warning("Discogs tags response received with no pending album requests")
if use_album_requests and not album.loaded and album._requests == 0:
album._finalize_loading(None)
elif (use_album_requests and album.loaded and album._requests == 0) or not use_album_requests:
album.update()
class DiscogsManualSearchAction(BaseAction):
NAME = "[Discogs] Manual Search"
def __init__(self):
super().__init__()
def callback(self, objs):
albums = [a for a in objs if isinstance(a, Album)]
if not albums:
return
for album in albums:
title = album.metadata.get('album', '') if getattr(album, 'metadata', None) else ''
prompt = "Enter Discogs master/release code"
if title:
prompt = f"{prompt} for \"{title}\""
prompt = f"{prompt}\nFound in the top right corner of the page, example: [m123456] or [r123456]"
parent = QtWidgets.QApplication.activeWindow()
input_dialog = QtWidgets.QInputDialog(parent)
input_dialog.setWindowTitle("Discogs Manual Search")
input_dialog.setLabelText(prompt)
input_dialog.setInputMode(QtWidgets.QInputDialog.TextInput)
input_dialog.setTextValue("")
def focus_input() -> None:
line_edit = input_dialog.findChild(QtWidgets.QLineEdit)
if line_edit:
line_edit.setFocus()
QtCore.QTimer.singleShot(0, focus_input)
ok = input_dialog.exec_()
value = input_dialog.textValue()
if not ok or not value.strip():
continue
value = value.strip()
match = re.match(r'^\[[mMrR](\d+)\]$', value)
if match:
value = value[1:-1]
if not match:
QtWidgets.QMessageBox.warning(
parent,
"Invalid code",
"Please enter a valid Discogs master/release code, example: [m123456] or [r123456]",
)
continue
entity_id = match.group(1)
entity_type = 'master' if value.strip().lower().startswith('m') else 'release'
token = (config.setting["discogs_personal_access_token"] or "").strip()
DiscogsGenreProcessor().fetch_discogs_tags(
album,
album.metadata,
entity_type,
entity_id,
token,
use_album_requests=False,
)
register_options_page(DiscogsGenreOptionsPage)
register_album_action(DiscogsManualSearchAction())
register_album_metadata_processor(DiscogsGenreProcessor().process_album)