#!/usr/bin/env python3 """ Discord Image Downloader and Base64 Converter This script parses all CSV files in the discord_chat_logs directory, extracts attachment URLs, downloads the images, and saves them in base64 format with associated metadata (channel and sender information). """ import csv import os import base64 import json import requests import urllib.parse from pathlib import Path from typing import Dict, List, Optional import time import hashlib # Configuration CSV_DIRECTORY = "../discord_chat_logs" OUTPUT_DIRECTORY = "../images_dataset" OUTPUT_JSON_FILE = "images_dataset.json" MAX_RETRIES = 3 DELAY_BETWEEN_REQUESTS = 0.5 # seconds # Supported image extensions SUPPORTED_EXTENSIONS = {'.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp', '.tiff'} class ImageDownloader: def __init__(self, csv_dir: str, output_dir: str): self.csv_dir = Path(csv_dir) self.output_dir = Path(output_dir) self.output_dir.mkdir(exist_ok=True) self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) self.images_data = [] self.processed_urls = set() def get_file_extension_from_url(self, url: str) -> Optional[str]: """Extract file extension from URL, handling Discord CDN URLs.""" # Parse the URL to get the path parsed = urllib.parse.urlparse(url) path = parsed.path.lower() # Check for direct extension in path for ext in SUPPORTED_EXTENSIONS: if ext in path: return ext # Check query parameters for format info query_params = urllib.parse.parse_qs(parsed.query) if 'format' in query_params: format_val = query_params['format'][0].lower() if f'.{format_val}' in SUPPORTED_EXTENSIONS: return f'.{format_val}' return None def is_image_url(self, url: str) -> bool: """Check if URL points to an image file.""" if not url or not url.startswith(('http://', 'https://')): return False return self.get_file_extension_from_url(url) is not None def download_image(self, url: str) -> Optional[bytes]: """Download image from URL with retries.""" for attempt in range(MAX_RETRIES): try: print(f"Downloading: {url} (attempt {attempt + 1})") response = self.session.get(url, timeout=30) response.raise_for_status() # Verify content is actually an image content_type = response.headers.get('content-type', '').lower() if not content_type.startswith('image/'): print(f"Warning: URL doesn't return image content: {url}") return None return response.content except requests.exceptions.RequestException as e: print(f"Error downloading {url}: {e}") if attempt < MAX_RETRIES - 1: time.sleep(DELAY_BETWEEN_REQUESTS * (attempt + 1)) else: print(f"Failed to download after {MAX_RETRIES} attempts: {url}") return None return None def process_csv_file(self, csv_path: Path) -> None: """Process a single CSV file to extract and download images.""" channel_name = csv_path.stem print(f"\nProcessing channel: {channel_name}") try: with open(csv_path, 'r', encoding='utf-8') as csvfile: reader = csv.DictReader(csvfile) for row_num, row in enumerate(reader, 1): attachment_urls = row.get('attachment_urls', '').strip() if not attachment_urls: continue # Split multiple URLs if they exist (comma-separated) urls = [url.strip() for url in attachment_urls.split(',') if url.strip()] for url in urls: if url in self.processed_urls: continue if not self.is_image_url(url): continue self.processed_urls.add(url) # Download the image image_data = self.download_image(url) if image_data is None: continue # Create unique filename based on URL hash url_hash = hashlib.md5(url.encode()).hexdigest()[:12] file_extension = self.get_file_extension_from_url(url) or '.unknown' # Convert to base64 base64_data = base64.b64encode(image_data).decode('utf-8') # Create metadata image_metadata = { 'url': url, 'channel': channel_name, 'author_name': row.get('author_name', ''), 'author_nickname': row.get('author_nickname', ''), 'author_id': row.get('author_id', ''), 'message_id': row.get('message_id', ''), 'timestamp_utc': row.get('timestamp_utc', ''), 'content': row.get('content', ''), 'file_extension': file_extension, 'file_size': len(image_data), 'url_hash': url_hash, 'base64_data': base64_data } self.images_data.append(image_metadata) print(f"āœ“ Downloaded and converted: {url} ({len(image_data)} bytes)") # Small delay to be respectful time.sleep(DELAY_BETWEEN_REQUESTS) except Exception as e: print(f"Error processing {csv_path}: {e}") def save_dataset(self) -> None: """Save the collected images dataset to JSON file.""" output_file = self.output_dir / OUTPUT_JSON_FILE # Create summary statistics summary = { 'total_images': len(self.images_data), 'channels': list(set(img['channel'] for img in self.images_data)), 'total_size_bytes': sum(img['file_size'] for img in self.images_data), 'file_extensions': list(set(img['file_extension'] for img in self.images_data)), 'authors': list(set(img['author_name'] for img in self.images_data if img['author_name'])) } # Prepare final dataset dataset = { 'metadata': { 'created_at': time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime()), 'summary': summary }, 'images': self.images_data } # Save to JSON file with open(output_file, 'w', encoding='utf-8') as jsonfile: json.dump(dataset, jsonfile, indent=2, ensure_ascii=False) print(f"\nāœ“ Dataset saved to: {output_file}") print(f"Total images: {summary['total_images']}") print(f"Total size: {summary['total_size_bytes']:,} bytes") print(f"Channels: {', '.join(summary['channels'])}") def run(self) -> None: """Main execution function.""" print("Discord Image Downloader and Base64 Converter") print("=" * 50) # Find all CSV files csv_files = list(self.csv_dir.glob("*.csv")) if not csv_files: print(f"No CSV files found in {self.csv_dir}") return print(f"Found {len(csv_files)} CSV files to process") # Process each CSV file for csv_file in csv_files: self.process_csv_file(csv_file) # Save the final dataset if self.images_data: self.save_dataset() else: print("\nNo images were found or downloaded.") def main(): """Main entry point.""" script_dir = Path(__file__).parent csv_directory = script_dir / CSV_DIRECTORY output_directory = script_dir / OUTPUT_DIRECTORY if not csv_directory.exists(): print(f"Error: CSV directory not found: {csv_directory}") return downloader = ImageDownloader(str(csv_directory), str(output_directory)) downloader.run() if __name__ == "__main__": main()