Better replaygain parsing

This commit is contained in:
2025-09-23 19:37:29 -04:00
parent c62f7f9e31
commit a8794fa239

View File

@@ -1,4 +1,5 @@
import os import os
import re
import json import json
import shutil import shutil
import subprocess import subprocess
@@ -8,10 +9,9 @@ import struct
import threading import threading
import concurrent.futures import concurrent.futures
import tempfile import tempfile
import math
from functools import partial from functools import partial
from typing import List, Tuple, Dict, Optional from typing import List, Tuple, Dict, Optional
from picard import config, log from picard import config, log
from picard.ui.itemviews import ( from picard.ui.itemviews import (
BaseAction, BaseAction,
@@ -22,10 +22,7 @@ from picard.track import Track
from picard.album import Album from picard.album import Album
from picard.ui.options import OptionsPage, register_options_page from picard.ui.options import OptionsPage, register_options_page
from picard.util import thread from picard.util import thread
from picard.coverart.image import ( from picard.coverart.image import CoverArtImage
CoverArtImage,
CoverArtImageError,
)
from PyQt5 import QtWidgets, QtCore from PyQt5 import QtWidgets, QtCore
_analysis_semaphore = None _analysis_semaphore = None
@@ -671,7 +668,7 @@ class AcousticBrainzNG:
if not ffmpeg_path: if not ffmpeg_path:
raise ValueError("FFmpeg path not configured") raise ValueError("FFmpeg path not configured")
replaygain_lufs_result = subprocess.run( replaygain_proc = subprocess.run(
[ffmpeg_path, "-hide_banner", "-i", file_path, "-af", f"loudnorm=I={config.setting['acousticbrainz_ng_replaygain_reference_loudness']}:print_format=json", "-f", "null", "-"], [ffmpeg_path, "-hide_banner", "-i", file_path, "-af", f"loudnorm=I={config.setting['acousticbrainz_ng_replaygain_reference_loudness']}:print_format=json", "-f", "null", "-"],
capture_output=True, capture_output=True,
text=True, text=True,
@@ -679,40 +676,66 @@ class AcousticBrainzNG:
creationflags=subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0 creationflags=subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0
) )
if replaygain_lufs_result.returncode != 0: if replaygain_proc.returncode != 0:
log.error(f"FFmpeg failed for ReplayGain LUFS calculation on file {file_path} with exit code {replaygain_lufs_result.returncode}") log.error(f"FFmpeg failed for ReplayGain LUFS calculation on file {file_path} with exit code {replaygain_proc.returncode}")
if replaygain_lufs_result.stdout: if replaygain_proc.stdout:
log.error(f"FFmpeg stdout: {replaygain_lufs_result.stdout}") log.error(f"FFmpeg stdout: {replaygain_proc.stdout}")
if replaygain_lufs_result.stderr: if replaygain_proc.stderr:
log.error(f"FFmpeg stderr: {replaygain_lufs_result.stderr}") log.error(f"FFmpeg stderr: {replaygain_proc.stderr}")
return {} return {}
replaygain_log = replaygain_proc.stderr or replaygain_proc.stdout
replaygain_match = re.search(r'\{.*?\}', replaygain_log, re.S)
replaygain_matches = re.findall(r'\{.*?\}', replaygain_log, re.S) if not replaygain_match else None
replaygain_json_text = replaygain_match.group(0) if replaygain_match else (replaygain_matches[0] if replaygain_matches else None)
replaygain_gain = None replaygain_gain = None
replaygain_peak = None replaygain_peak = None
replaygain_range = None replaygain_range = None
try: replaygain_lufs_result: dict | None = None
json_start = replaygain_lufs_result.stderr.find('{')
if json_start != -1: if replaygain_json_text:
json_str = replaygain_lufs_result.stderr[json_start:] try:
json_end = json_str.find('}') + 1 replaygain_lufs_result = json.loads(replaygain_json_text)
if json_end > 0: except json.JSONDecodeError:
loudnorm_data = json.loads(json_str[:json_end]) if replaygain_matches:
input_i = loudnorm_data.get('input_i') try:
input_tp = loudnorm_data.get('input_tp') replaygain_lufs_result = json.loads(replaygain_matches[-1])
input_lra = loudnorm_data.get('input_lra') except Exception:
replaygain_lufs_result = None
if input_i and input_i != "-inf":
replaygain_gain = f"{(config.setting['acousticbrainz_ng_replaygain_reference_loudness'] or -18) - float(input_i):.2f}"
if input_tp and input_tp != "-inf": input_i = replaygain_lufs_result.get('input_i') if replaygain_lufs_result else None
replaygain_peak = f"{10 ** (float(input_tp) / 20):.6f}" input_tp = replaygain_lufs_result.get('input_tp') if replaygain_lufs_result else None
input_lra = replaygain_lufs_result.get('input_lra') if replaygain_lufs_result else None
if input_lra and input_lra != "-inf":
replaygain_range = f"{float(input_lra):.2f}" try:
if input_i:
except (json.JSONDecodeError, ValueError, TypeError): input_i_val = float(input_i)
pass except (TypeError, ValueError):
input_i_val = None
try:
if input_tp:
input_tp_val = float(input_tp)
except (TypeError, ValueError):
input_tp_val = None
try:
if input_lra:
input_lra_val = float(input_lra)
except (TypeError, ValueError):
input_lra_val = None
if input_i_val is not None and math.isfinite(input_i_val):
replaygain_gain = f"{(config.setting['acousticbrainz_ng_replaygain_reference_loudness'] or -18) - input_i_val:.2f}"
if input_tp_val is not None and math.isfinite(input_tp_val):
replaygain_peak = f"{10 ** (input_tp_val / 20):.6f}"
if input_lra_val is not None and math.isfinite(input_lra_val):
replaygain_range = f"{input_lra_val:.2f}"
result: Dict = { result: Dict = {
"replaygain_track_gain": replaygain_gain, "replaygain_track_gain": replaygain_gain,
@@ -721,7 +744,7 @@ class AcousticBrainzNG:
"replaygain_reference_loudness": f"{(config.setting['acousticbrainz_ng_replaygain_reference_loudness'] or -18):.2f}" "replaygain_reference_loudness": f"{(config.setting['acousticbrainz_ng_replaygain_reference_loudness'] or -18):.2f}"
} }
r128_result = subprocess.run( r128_proc = subprocess.run(
[ffmpeg_path, "-hide_banner", "-i", file_path, "-af", "loudnorm=I=-23:print_format=json", "-f", "null", "-"], [ffmpeg_path, "-hide_banner", "-i", file_path, "-af", "loudnorm=I=-23:print_format=json", "-f", "null", "-"],
capture_output=True, capture_output=True,
text=True, text=True,
@@ -729,36 +752,50 @@ class AcousticBrainzNG:
creationflags=subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0 creationflags=subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0
) )
if r128_result.returncode != 0: if r128_proc.returncode != 0:
log.error(f"FFmpeg failed for R128 calculation on file {file_path} with exit code {r128_result.returncode}") log.error(f"FFmpeg failed for R128 calculation on file {file_path} with exit code {r128_proc.returncode}")
if r128_result.stdout: if r128_proc.stdout:
log.error(f"FFmpeg stdout: {r128_result.stdout}") log.error(f"FFmpeg stdout: {r128_proc.stdout}")
if r128_result.stderr: if r128_proc.stderr:
log.error(f"FFmpeg stderr: {r128_result.stderr}") log.error(f"FFmpeg stderr: {r128_proc.stderr}")
return result return result
r128_log = r128_proc.stderr or r128_proc.stdout
r128_match = re.search(r'\{.*?\}', r128_log, re.S)
r128_matches = re.findall(r'\{.*?\}', r128_log, re.S) if not r128_match else None
r128_json_text = r128_match.group(0) if r128_match else (r128_matches[0] if r128_matches else None)
r128_track_gain = None r128_track_gain = None
r128_data: dict | None = None
if r128_json_text:
try:
r128_data = json.loads(r128_json_text)
except json.JSONDecodeError:
if r128_matches:
try:
r128_data = json.loads(r128_matches[-1])
except Exception:
r128_data = None
r128_input_i = r128_data.get('input_i') if r128_data else None
try: try:
json_start = r128_result.stderr.find('{') if r128_input_i:
if json_start != -1: r128_input_i_val = int(r128_input_i)
json_str = r128_result.stderr[json_start:] except (TypeError, ValueError):
json_end = json_str.find('}') + 1 r128_input_i_val = None
if json_end > 0:
r128_data = json.loads(json_str[:json_end]) if r128_input_i_val is not None and math.isfinite(r128_input_i_val):
r128_input_i = r128_data.get('input_i') r128_gain_db = -23 - r128_input_i_val
r128_track_gain = int(round(r128_gain_db * 256))
if r128_input_i and r128_input_i != "-inf":
r128_gain_db = -23 - float(r128_input_i) if r128_track_gain < -32768:
r128_track_gain = int(round(r128_gain_db * 256)) r128_track_gain = -32768
elif r128_track_gain > 32767:
if r128_track_gain < -32768: r128_track_gain = 32767
r128_track_gain = -32768
elif r128_track_gain > 32767:
r128_track_gain = 32767
except (json.JSONDecodeError, ValueError, TypeError):
pass
result["r128_track_gain"] = r128_track_gain result["r128_track_gain"] = r128_track_gain
@@ -787,7 +824,7 @@ class AcousticBrainzNG:
concat_file_path = concat_file.name concat_file_path = concat_file.name
try: try:
album_replaygain_result = subprocess.run( album_replaygain_proc = subprocess.run(
[ffmpeg_path, "-hide_banner", "-f", "concat", "-safe", "0", "-i", concat_file_path, [ffmpeg_path, "-hide_banner", "-f", "concat", "-safe", "0", "-i", concat_file_path,
"-vn", "-af", f"loudnorm=I={config.setting['acousticbrainz_ng_replaygain_reference_loudness']}:print_format=json", "-f", "null", "-"], "-vn", "-af", f"loudnorm=I={config.setting['acousticbrainz_ng_replaygain_reference_loudness']}:print_format=json", "-f", "null", "-"],
capture_output=True, capture_output=True,
@@ -796,49 +833,75 @@ class AcousticBrainzNG:
creationflags=subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0 creationflags=subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0
) )
if album_replaygain_result.returncode != 0: if album_replaygain_proc.returncode != 0:
log.error(f"FFmpeg failed for album ReplayGain calculation on {len(album_track_files)} files with exit code {album_replaygain_result.returncode}") log.error(f"FFmpeg failed for album ReplayGain calculation on {len(album_track_files)} files with exit code {album_replaygain_proc.returncode}")
log.error(f"Album files: {', '.join(album_track_files)}") log.error(f"Album files: {', '.join(album_track_files)}")
if album_replaygain_result.stdout: if album_replaygain_proc.stdout:
log.error(f"FFmpeg stdout: {album_replaygain_result.stdout}") log.error(f"FFmpeg stdout: {album_replaygain_proc.stdout}")
if album_replaygain_result.stderr: if album_replaygain_proc.stderr:
log.error(f"FFmpeg stderr: {album_replaygain_result.stderr}") log.error(f"FFmpeg stderr: {album_replaygain_proc.stderr}")
return {} return {}
album_replaygain_log = album_replaygain_proc.stderr or album_replaygain_proc.stdout
album_replaygain_match = re.search(r'\{.*?\}', album_replaygain_log, re.S)
album_replaygain_matches = re.findall(r'\{.*?\}', album_replaygain_log, re.S) if not album_replaygain_match else None
album_replaygain_json_text = album_replaygain_match.group(0) if album_replaygain_match else (album_replaygain_matches[0] if album_replaygain_matches else None)
album_gain = None album_gain = None
album_peak = None album_peak = None
album_range = None album_range = None
try: loudnorm_data: dict | None = None
json_start = album_replaygain_result.stderr.find('{')
if json_start != -1:
json_str = album_replaygain_result.stderr[json_start:]
json_end = json_str.find('}') + 1
if json_end > 0:
loudnorm_data = json.loads(json_str[:json_end])
input_i = loudnorm_data.get('input_i')
input_tp = loudnorm_data.get('input_tp')
input_lra = loudnorm_data.get('input_lra')
if input_i and input_i != "-inf":
album_gain = f"{(config.setting['acousticbrainz_ng_replaygain_reference_loudness'] or -18) - float(input_i):.2f}"
if input_tp and input_tp != "-inf":
album_peak = f"{10 ** (float(input_tp) / 20):.6f}"
if input_lra and input_lra != "-inf":
album_range = f"{float(input_lra):.2f}"
except (json.JSONDecodeError, ValueError, TypeError):
pass
if album_replaygain_json_text:
try:
loudnorm_data = json.loads(album_replaygain_json_text)
except json.JSONDecodeError:
if album_replaygain_matches:
try:
loudnorm_data = json.loads(album_replaygain_matches[-1])
except Exception:
loudnorm_data = None
input_i = loudnorm_data.get('input_i') if loudnorm_data else None
input_tp = loudnorm_data.get('input_tp') if loudnorm_data else None
input_lra = loudnorm_data.get('input_lra') if loudnorm_data else None
try:
if input_i:
input_i_val = float(input_i)
except (TypeError, ValueError):
input_i_val = None
try:
if input_tp:
input_tp_val = float(input_tp)
except (TypeError, ValueError):
input_tp_val = None
try:
if input_lra:
input_lra_val = float(input_lra)
except (TypeError, ValueError):
input_lra_val = None
if input_i_val is not None and math.isfinite(input_i_val):
album_gain = f"{(config.setting['acousticbrainz_ng_replaygain_reference_loudness'] or -18) - input_i_val:.2f}"
if input_tp_val is not None and math.isfinite(input_tp_val):
album_peak = f"{10 ** (input_tp_val / 20):.6f}"
if input_lra_val is not None and math.isfinite(input_lra_val):
album_range = f"{input_lra_val:.2f}"
result: Dict = { result: Dict = {
"replaygain_album_gain": album_gain, "replaygain_album_gain": album_gain,
"replaygain_album_peak": album_peak, "replaygain_album_peak": album_peak,
"replaygain_album_range": album_range "replaygain_album_range": album_range
} }
album_r128_result = subprocess.run( album_r128_proc = subprocess.run(
[ffmpeg_path, "-hide_banner", "-f", "concat", "-safe", "0", "-i", concat_file_path, [ffmpeg_path, "-hide_banner", "-f", "concat", "-safe", "0", "-i", concat_file_path,
"-vn", "-af", "loudnorm=I=-23:print_format=json", "-f", "null", "-"], "-vn", "-af", "loudnorm=I=-23:print_format=json", "-f", "null", "-"],
capture_output=True, capture_output=True,
@@ -847,37 +910,51 @@ class AcousticBrainzNG:
creationflags=subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0 creationflags=subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0
) )
if album_r128_result.returncode != 0: if album_r128_proc.returncode != 0:
log.error(f"FFmpeg failed for album R128 calculation on {len(album_track_files)} files with exit code {album_r128_result.returncode}") log.error(f"FFmpeg failed for album R128 calculation on {len(album_track_files)} files with exit code {album_r128_proc.returncode}")
log.error(f"Album files: {', '.join(album_track_files)}") log.error(f"Album files: {', '.join(album_track_files)}")
if album_r128_result.stdout: if album_r128_proc.stdout:
log.error(f"FFmpeg stdout: {album_r128_result.stdout}") log.error(f"FFmpeg stdout: {album_r128_proc.stdout}")
if album_r128_result.stderr: if album_r128_proc.stderr:
log.error(f"FFmpeg stderr: {album_r128_result.stderr}") log.error(f"FFmpeg stderr: {album_r128_proc.stderr}")
return result return result
album_r128_log = album_r128_proc.stderr or album_r128_proc.stdout
album_r128_match = re.search(r'\{.*?\}', album_r128_log, re.S)
album_r128_matches = re.findall(r'\{.*?\}', album_r128_log, re.S) if not album_r128_match else None
album_r128_json_text = album_r128_match.group(0) if album_r128_match else (album_r128_matches[0] if album_r128_matches else None)
r128_album_gain = None r128_album_gain = None
r128_data: dict | None = None
if album_r128_json_text:
try:
r128_data = json.loads(album_r128_json_text)
except json.JSONDecodeError:
if album_r128_matches:
try:
r128_data = json.loads(album_r128_matches[-1])
except Exception:
r128_data = None
r128_input_i = r128_data.get('input_i') if r128_data else None
try: try:
json_start = album_r128_result.stderr.find('{') if r128_input_i:
if json_start != -1: r128_input_i_val = int(r128_input_i)
json_str = album_r128_result.stderr[json_start:] except (TypeError, ValueError):
json_end = json_str.find('}') + 1 r128_input_i_val = None
if json_end > 0:
r128_data = json.loads(json_str[:json_end]) if r128_input_i_val is not None and math.isfinite(r128_input_i_val):
r128_input_i = r128_data.get('input_i') r128_gain_db = -23 - r128_input_i_val
r128_album_gain = int(round(r128_gain_db * 256))
if r128_input_i and r128_input_i != "-inf":
r128_gain_db = -23 - float(r128_input_i) if r128_album_gain < -32768:
r128_album_gain = int(round(r128_gain_db * 256)) r128_album_gain = -32768
elif r128_album_gain > 32767:
if r128_album_gain < -32768: r128_album_gain = 32767
r128_album_gain = -32768
elif r128_album_gain > 32767:
r128_album_gain = 32767
except (json.JSONDecodeError, ValueError, TypeError):
pass
result["r128_album_gain"] = r128_album_gain result["r128_album_gain"] = r128_album_gain