Created
August 1, 2025 18:55
-
-
Save kubrick06010/4bb33afe39d2e662a4bdbd47c396e9dc to your computer and use it in GitHub Desktop.
# dscrob Download, update, and upload Last.fm scrobbles. ## Usage ``` python dscrob.py download|update|upload [args] ``` ## Limitations Uploads must be <2 weeks old. Older scrobbles may be ignored by the API.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import requests | |
| import hashlib | |
| import time | |
| import pandas as pd | |
| import argparse | |
| import webbrowser | |
| API_ROOT = "http://ws.audioscrobbler.com/2.0/" | |
| MAX_RETRIES = 5 | |
| RETRY_BACKOFF = 2 # seconds for each trial | |
| def make_api_sig(params, secret): | |
| # Excludes 'format' and empty parameters | |
| keys = sorted(k for k in params if k != 'format' and params[k] != "") | |
| string = "" | |
| for k in keys: | |
| string += k + str(params[k]) | |
| string += secret | |
| print(f"String to sign: {string}") # Helps debugging | |
| return hashlib.md5(string.encode("utf-8")).hexdigest() | |
| def safe_request_get(url, params): | |
| for attempt in range(1, MAX_RETRIES + 1): | |
| try: | |
| r = requests.get(url, params=params, timeout=10) | |
| r.raise_for_status() | |
| return r | |
| except requests.RequestException as e: | |
| print(f"Request failed (attempt {attempt}/{MAX_RETRIES}): {e}") | |
| if attempt == MAX_RETRIES: | |
| raise | |
| wait = RETRY_BACKOFF ** attempt | |
| print(f"Retrying in {wait} seconds...") | |
| time.sleep(wait) | |
| def safe_request_post(url, data): | |
| for attempt in range(1, MAX_RETRIES + 1): | |
| try: | |
| r = requests.post(url, data=data, timeout=10) | |
| r.raise_for_status() | |
| return r | |
| except requests.RequestException as e: | |
| print(f"Post request failed (attempt {attempt}/{MAX_RETRIES}): {e}") | |
| if attempt == MAX_RETRIES: | |
| raise | |
| wait = RETRY_BACKOFF ** attempt | |
| print(f"Retrying in {wait} seconds...") | |
| time.sleep(wait) | |
| def get_session_key(api_key, api_secret): | |
| # Step 1: Get token | |
| params = { | |
| "method": "auth.getToken", | |
| "api_key": api_key, | |
| "format": "json" | |
| } | |
| r = safe_request_get(API_ROOT, params) | |
| token = r.json().get("token") | |
| if not token: | |
| raise Exception("Could not get token: " + str(r.json())) | |
| url = f"http://www.last.fm/api/auth/?api_key={api_key}&token={token}" | |
| print(f"Please open the following URL and authorize the app:\n{url}") | |
| webbrowser.open(url) | |
| input("Press Enter once you have authorized the app...") | |
| params = { | |
| "method": "auth.getSession", | |
| "api_key": api_key, | |
| "token": token, | |
| "format": "json" | |
| } | |
| params["api_sig"] = make_api_sig(params, api_secret) | |
| r = safe_request_get(API_ROOT, params) | |
| data = r.json() | |
| if "session" not in data: | |
| raise Exception(f"Failed to obtain session key: {data}") | |
| session_key = data["session"]["key"] | |
| print(f"Session Key obtained: {session_key}") | |
| print("Keep it safe for future uploads!") | |
| return session_key | |
| def fetch_page(username, api_key, page=1, from_timestamp=None): | |
| params = { | |
| "method": "user.getRecentTracks", | |
| "user": username, | |
| "api_key": api_key, | |
| "format": "json", | |
| "limit": 200, | |
| "page": page | |
| } | |
| if from_timestamp: | |
| params["from"] = from_timestamp | |
| r = safe_request_get(API_ROOT, params) | |
| return r.json() | |
| def parse_tracks(json_data): | |
| tracks = json_data["recenttracks"]["track"] | |
| parsed = [] | |
| for t in tracks: | |
| timestamp = t.get("date", {}).get("uts") | |
| if not timestamp: | |
| continue | |
| parsed.append({ | |
| "artist": t["artist"]["#text"], | |
| "track": t["name"], | |
| "album": t["album"]["#text"], | |
| "date_unix": int(timestamp) | |
| }) | |
| return parsed | |
| def download_scrobbles(username, api_key, from_timestamp=None): | |
| json_page1 = fetch_page(username, api_key, page=1, from_timestamp=from_timestamp) | |
| total_pages = int(json_page1["recenttracks"]["@attr"]["totalPages"]) | |
| print(f"Total pages to download: {total_pages}") | |
| all_tracks = parse_tracks(json_page1) | |
| for p in range(2, total_pages + 1): | |
| print(f"Downloading page {p}/{total_pages}...") | |
| json_page = fetch_page(username, api_key, page=p, from_timestamp=from_timestamp) | |
| all_tracks.extend(parse_tracks(json_page)) | |
| df = pd.DataFrame(all_tracks) | |
| print("Columns in downloaded data:", df.columns.tolist()) | |
| return df | |
| def get_last_timestamp(df): | |
| if "date_unix" not in df.columns: | |
| raise Exception("DataFrame has no 'date_unix' column") | |
| return df["date_unix"].max() | |
| def update_scrobbles(existing_df, username, api_key): | |
| last_ts = get_last_timestamp(existing_df) | |
| print(f"Updating from timestamp: {last_ts}") | |
| new_df = download_scrobbles(username, api_key, from_timestamp=last_ts + 1) | |
| if new_df.empty: | |
| print("No new scrobbles found.") | |
| return existing_df | |
| combined = pd.concat([existing_df, new_df], ignore_index=True) | |
| combined.drop_duplicates(subset=["artist", "track", "date_unix"], inplace=True) | |
| print(f"Updated dataset now has {len(combined)} scrobbles.") | |
| return combined | |
| def upload_scrobbles(input_file, api_key, api_secret): | |
| df = pd.read_csv(input_file) | |
| print("Columns in upload file:", df.columns.tolist()) | |
| if "date_unix" not in df.columns: | |
| raise Exception("Upload file must have 'date_unix' column") | |
| session_key = get_session_key(api_key, api_secret) | |
| for idx, row in df.iterrows(): | |
| params = { | |
| "method": "track.scrobble", | |
| "api_key": api_key, | |
| "sk": session_key, | |
| "artist": row["artist"], | |
| "track": row["track"], | |
| "timestamp": str(int(row["date_unix"])), | |
| "format": "json" | |
| } | |
| # Crear un diccionario para firmar, excluyendo 'format' | |
| sig_params = {k: v for k, v in params.items() if k != "format" and v != ""} | |
| params["api_sig"] = make_api_sig(sig_params, api_secret) | |
| print(f"Uploading scrobble index {idx} with params: {params}") | |
| try: | |
| response = safe_request_post(API_ROOT, params) | |
| print(f"Scrobbled: {row['artist']} - {row['track']} at {row['date_unix']}") | |
| except Exception as e: | |
| print(f"Error uploading scrobble index {idx}: {e}") | |
| time.sleep(0.25) | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Download or upload Last.fm scrobbles.") | |
| subparsers = parser.add_subparsers(dest="command", required=True) | |
| dl_parser = subparsers.add_parser("download", help="Download scrobbles") | |
| dl_parser.add_argument("--username", required=True, help="Last.fm username") | |
| dl_parser.add_argument("--api_key", required=True, help="Last.fm API key") | |
| dl_parser.add_argument("--output", required=True, help="CSV file to save scrobbles") | |
| update_parser = subparsers.add_parser("update", help="Update scrobbles from existing CSV") | |
| update_parser.add_argument("--username", required=True, help="Last.fm username") | |
| update_parser.add_argument("--api_key", required=True, help="Last.fm API key") | |
| update_parser.add_argument("--input", required=True, help="Existing CSV file") | |
| update_parser.add_argument("--output", required=True, help="CSV file to save updated scrobbles") | |
| upload_parser = subparsers.add_parser("upload", help="Upload scrobbles from CSV") | |
| upload_parser.add_argument("--input", required=True, help="CSV file to read scrobbles") | |
| upload_parser.add_argument("--api_key", required=True, help="Last.fm API key") | |
| upload_parser.add_argument("--api_secret", required=True, help="Last.fm API shared secret") | |
| args = parser.parse_args() | |
| if args.command == "download": | |
| df = download_scrobbles(args.username, args.api_key) | |
| df.to_csv(args.output, index=False) | |
| print(f"Downloaded scrobbles saved to {args.output}") | |
| elif args.command == "update": | |
| existing_df = pd.read_csv(args.input) | |
| df_updated = update_scrobbles(existing_df, args.username, args.api_key) | |
| df_updated.to_csv(args.output, index=False) | |
| print(f"Updated scrobbles saved to {args.output}") | |
| elif args.command == "upload": | |
| upload_scrobbles(args.input, args.api_key, args.api_secret) | |
| if __name__ == "__main__": | |
| main() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
π΅ dscrob.py β Last.fm Scrobble Downloader & Uploader
A simple Python script to download, update, and upload your Last.fm scrobbles using the official Last.fm API.
βοΈ Requirements
π Usage
1. Download all scrobbles
π½ Saves your entire scrobble history to a CSV file.
2. Update an existing scrobble file
π Downloads only new scrobbles and appends them to your existing file (no duplicates).
3. Upload scrobbles from CSV
πΌ Uploads each row in the CSV file to your Last.fm account as a scrobble.
π The first time you run this, the script will open your browser to authorize the app.
Once you authorize it, return to the terminal and press
Enter.π CSV File Format
CSV files must contain the following columns:
artisttrackalbumdate_unixExample:
π Authentication (Upload only)
β Tips
π« Limitations
Happy scrobbling! π§