Files
cult-scraper/scripts/image_downloader.py

229 lines
8.8 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Discord Image Downloader and Base64 Converter
This script parses all CSV files in the discord_chat_logs directory,
extracts attachment URLs, downloads the images, and saves them in base64
format with associated metadata (channel and sender information).
"""
import csv
import os
import base64
import json
import requests
import urllib.parse
from pathlib import Path
from typing import Dict, List, Optional
import time
import hashlib
# Configuration
CSV_DIRECTORY = "../discord_chat_logs"
OUTPUT_DIRECTORY = "../images_dataset"
OUTPUT_JSON_FILE = "images_dataset.json"
MAX_RETRIES = 3
DELAY_BETWEEN_REQUESTS = 0.5 # seconds
# Supported image extensions
SUPPORTED_EXTENSIONS = {'.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp', '.tiff'}
class ImageDownloader:
def __init__(self, csv_dir: str, output_dir: str):
self.csv_dir = Path(csv_dir)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
self.images_data = []
self.processed_urls = set()
def get_file_extension_from_url(self, url: str) -> Optional[str]:
"""Extract file extension from URL, handling Discord CDN URLs."""
# Parse the URL to get the path
parsed = urllib.parse.urlparse(url)
path = parsed.path.lower()
# Check for direct extension in path
for ext in SUPPORTED_EXTENSIONS:
if ext in path:
return ext
# Check query parameters for format info
query_params = urllib.parse.parse_qs(parsed.query)
if 'format' in query_params:
format_val = query_params['format'][0].lower()
if f'.{format_val}' in SUPPORTED_EXTENSIONS:
return f'.{format_val}'
return None
def is_image_url(self, url: str) -> bool:
"""Check if URL points to an image file."""
if not url or not url.startswith(('http://', 'https://')):
return False
return self.get_file_extension_from_url(url) is not None
def download_image(self, url: str) -> Optional[bytes]:
"""Download image from URL with retries."""
for attempt in range(MAX_RETRIES):
try:
print(f"Downloading: {url} (attempt {attempt + 1})")
response = self.session.get(url, timeout=30)
response.raise_for_status()
# Verify content is actually an image
content_type = response.headers.get('content-type', '').lower()
if not content_type.startswith('image/'):
print(f"Warning: URL doesn't return image content: {url}")
return None
return response.content
except requests.exceptions.RequestException as e:
print(f"Error downloading {url}: {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(DELAY_BETWEEN_REQUESTS * (attempt + 1))
else:
print(f"Failed to download after {MAX_RETRIES} attempts: {url}")
return None
return None
def process_csv_file(self, csv_path: Path) -> None:
"""Process a single CSV file to extract and download images."""
channel_name = csv_path.stem
print(f"\nProcessing channel: {channel_name}")
try:
with open(csv_path, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row_num, row in enumerate(reader, 1):
attachment_urls = row.get('attachment_urls', '').strip()
if not attachment_urls:
continue
# Split multiple URLs if they exist (comma-separated)
urls = [url.strip() for url in attachment_urls.split(',') if url.strip()]
for url in urls:
if url in self.processed_urls:
continue
if not self.is_image_url(url):
continue
self.processed_urls.add(url)
# Download the image
image_data = self.download_image(url)
if image_data is None:
continue
# Create unique filename based on URL hash
url_hash = hashlib.md5(url.encode()).hexdigest()[:12]
file_extension = self.get_file_extension_from_url(url) or '.unknown'
# Convert to base64
base64_data = base64.b64encode(image_data).decode('utf-8')
# Create metadata
image_metadata = {
'url': url,
'channel': channel_name,
'author_name': row.get('author_name', ''),
'author_nickname': row.get('author_nickname', ''),
'author_id': row.get('author_id', ''),
'message_id': row.get('message_id', ''),
'timestamp_utc': row.get('timestamp_utc', ''),
'content': row.get('content', ''),
'file_extension': file_extension,
'file_size': len(image_data),
'url_hash': url_hash,
'base64_data': base64_data
}
self.images_data.append(image_metadata)
print(f"✓ Downloaded and converted: {url} ({len(image_data)} bytes)")
# Small delay to be respectful
time.sleep(DELAY_BETWEEN_REQUESTS)
except Exception as e:
print(f"Error processing {csv_path}: {e}")
def save_dataset(self) -> None:
"""Save the collected images dataset to JSON file."""
output_file = self.output_dir / OUTPUT_JSON_FILE
# Create summary statistics
summary = {
'total_images': len(self.images_data),
'channels': list(set(img['channel'] for img in self.images_data)),
'total_size_bytes': sum(img['file_size'] for img in self.images_data),
'file_extensions': list(set(img['file_extension'] for img in self.images_data)),
'authors': list(set(img['author_name'] for img in self.images_data if img['author_name']))
}
# Prepare final dataset
dataset = {
'metadata': {
'created_at': time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime()),
'summary': summary
},
'images': self.images_data
}
# Save to JSON file
with open(output_file, 'w', encoding='utf-8') as jsonfile:
json.dump(dataset, jsonfile, indent=2, ensure_ascii=False)
print(f"\n✓ Dataset saved to: {output_file}")
print(f"Total images: {summary['total_images']}")
print(f"Total size: {summary['total_size_bytes']:,} bytes")
print(f"Channels: {', '.join(summary['channels'])}")
def run(self) -> None:
"""Main execution function."""
print("Discord Image Downloader and Base64 Converter")
print("=" * 50)
# Find all CSV files
csv_files = list(self.csv_dir.glob("*.csv"))
if not csv_files:
print(f"No CSV files found in {self.csv_dir}")
return
print(f"Found {len(csv_files)} CSV files to process")
# Process each CSV file
for csv_file in csv_files:
self.process_csv_file(csv_file)
# Save the final dataset
if self.images_data:
self.save_dataset()
else:
print("\nNo images were found or downloaded.")
def main():
"""Main entry point."""
script_dir = Path(__file__).parent
csv_directory = script_dir / CSV_DIRECTORY
output_directory = script_dir / OUTPUT_DIRECTORY
if not csv_directory.exists():
print(f"Error: CSV directory not found: {csv_directory}")
return
downloader = ImageDownloader(str(csv_directory), str(output_directory))
downloader.run()
if __name__ == "__main__":
main()