diff --git a/scripts/data-overview.py b/scripts/data-overview.py new file mode 100644 index 0000000..8ca4fc7 --- /dev/null +++ b/scripts/data-overview.py @@ -0,0 +1,300 @@ +#!/usr/bin/env python3 +""" +Timeline Data Overview Script + +This script analyzes the Timeline.json file from Google Location History data +and provides comprehensive statistics about the tracked location data. +""" + +import json +import os +import sys +from datetime import datetime, timedelta +from collections import Counter, defaultdict +from typing import Dict, List, Any +import math +from io import StringIO + +def find_timeline_json(start_path: str) -> str: + """Find the Timeline.json file starting from the given path.""" + for root, dirs, files in os.walk(start_path): + if 'Timeline.json' in files: + return os.path.join(root, 'Timeline.json') + return None + +def parse_datetime(timestamp: str) -> datetime: + """Parse ISO format timestamp to datetime object.""" + try: + # Handle timestamps with timezone info + if timestamp.endswith('Z'): + timestamp = timestamp[:-1] + '+00:00' + return datetime.fromisoformat(timestamp.replace('Z', '+00:00')) + except ValueError: + # Fallback for different formats + try: + return datetime.fromisoformat(timestamp) + except ValueError: + return None + +def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: + """Calculate distance between two points using Haversine formula (in kilometers).""" + R = 6371 # Earth's radius in km + + lat1_rad = math.radians(lat1) + lat2_rad = math.radians(lat2) + delta_lat = math.radians(lat2 - lat1) + delta_lon = math.radians(lon2 - lon1) + + a = (math.sin(delta_lat / 2) ** 2 + + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(delta_lon / 2) ** 2) + c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) + + return R * c + +def parse_coordinates(point_str: str) -> tuple: + """Parse coordinate string like '51.6659027°, -0.4058773°' to lat, lon.""" + try: + coords = point_str.replace('°', '').split(', ') + return float(coords[0]), float(coords[1]) + except (ValueError, IndexError): + return None, None + +def analyze_timeline_data(timeline_path: str) -> Dict[str, Any]: + """Analyze the timeline data and return comprehensive statistics.""" + print(f"Loading timeline data from: {timeline_path}") + print("This may take a moment for large files...") + + with open(timeline_path, 'r', encoding='utf-8') as f: + data = json.load(f) + + semantic_segments = data.get('semanticSegments', []) + + # Initialize counters and collections + stats = { + 'total_segments': len(semantic_segments), + 'visits': 0, + 'timeline_paths': 0, + 'semantic_types': Counter(), + 'place_ids': Counter(), + 'monthly_activity': defaultdict(int), + 'yearly_activity': defaultdict(int), + 'daily_activity': defaultdict(int), + 'total_duration_hours': 0, + 'date_range': {'earliest': None, 'latest': None}, + 'locations': [], + 'probabilities': [], + } + + print("Analyzing segments...") + + for i, segment in enumerate(semantic_segments): + if i % 10000 == 0 and i > 0: + print(f"Processed {i:,} segments...") + + # Parse timestamps + start_time = parse_datetime(segment.get('startTime', '')) + end_time = parse_datetime(segment.get('endTime', '')) + + if start_time: + # Update date range + if stats['date_range']['earliest'] is None or start_time < stats['date_range']['earliest']: + stats['date_range']['earliest'] = start_time + if stats['date_range']['latest'] is None or start_time > stats['date_range']['latest']: + stats['date_range']['latest'] = start_time + + # Monthly and yearly activity + month_key = start_time.strftime('%Y-%m') + year_key = start_time.strftime('%Y') + day_key = start_time.strftime('%A') + + stats['monthly_activity'][month_key] += 1 + stats['yearly_activity'][year_key] += 1 + stats['daily_activity'][day_key] += 1 + + # Calculate duration + if start_time and end_time: + duration = end_time - start_time + stats['total_duration_hours'] += duration.total_seconds() / 3600 + + # Analyze visits + if 'visit' in segment: + stats['visits'] += 1 + visit = segment['visit'] + + # Probability analysis + if 'probability' in visit: + stats['probabilities'].append(visit['probability']) + + # Semantic types + top_candidate = visit.get('topCandidate', {}) + if 'semanticType' in top_candidate: + semantic_type = top_candidate['semanticType'] + stats['semantic_types'][semantic_type] += 1 + + # Place IDs + if 'placeId' in top_candidate: + place_id = top_candidate['placeId'] + stats['place_ids'][place_id] += 1 + + # Location coordinates + place_location = top_candidate.get('placeLocation', {}) + if 'latLng' in place_location: + lat, lon = parse_coordinates(place_location['latLng']) + if lat is not None and lon is not None: + stats['locations'].append((lat, lon)) + + # Analyze timeline paths + if 'timelinePath' in segment: + stats['timeline_paths'] += 1 + + # Calculate additional statistics + if stats['probabilities']: + stats['avg_probability'] = sum(stats['probabilities']) / len(stats['probabilities']) + stats['min_probability'] = min(stats['probabilities']) + stats['max_probability'] = max(stats['probabilities']) + + if stats['locations']: + # Find geographic bounds + lats = [loc[0] for loc in stats['locations']] + lons = [loc[1] for loc in stats['locations']] + stats['geographic_bounds'] = { + 'north': max(lats), + 'south': min(lats), + 'east': max(lons), + 'west': min(lons) + } + + # Calculate approximate geographic span + if len(stats['locations']) > 1: + max_distance = 0 + for i in range(0, min(len(stats['locations']), 1000), 10): # Sample for performance + for j in range(i + 1, min(len(stats['locations']), 1000), 10): + dist = calculate_distance( + stats['locations'][i][0], stats['locations'][i][1], + stats['locations'][j][0], stats['locations'][j][1] + ) + max_distance = max(max_distance, dist) + stats['max_distance_km'] = max_distance + + return stats + +def print_statistics(stats: Dict[str, Any]): + """Print comprehensive statistics in a readable format.""" + print("\n" + "="*80) + print("TIMELINE DATA OVERVIEW") + print("="*80) + + # Basic counts + print(f"\nšŸ“Š BASIC STATISTICS") + print(f" Total segments: {stats['total_segments']:,}") + print(f" Visits: {stats['visits']:,}") + print(f" Timeline paths: {stats['timeline_paths']:,}") + print(f" Unique places: {len(stats['place_ids']):,}") + + # Date range + if stats['date_range']['earliest'] and stats['date_range']['latest']: + earliest = stats['date_range']['earliest'] + latest = stats['date_range']['latest'] + total_days = (latest - earliest).days + print(f"\nšŸ“… DATE RANGE") + print(f" Earliest record: {earliest.strftime('%Y-%m-%d %H:%M:%S')}") + print(f" Latest record: {latest.strftime('%Y-%m-%d %H:%M:%S')}") + print(f" Total span: {total_days:,} days ({total_days/365.25:.1f} years)") + + # Duration statistics + if stats['total_duration_hours'] > 0: + print(f"\nā° DURATION STATISTICS") + print(f" Total tracked time: {stats['total_duration_hours']:,.1f} hours") + print(f" Average per day: {stats['total_duration_hours'] / max(total_days, 1):.1f} hours") + + # Probability statistics + if 'avg_probability' in stats: + print(f"\nšŸŽÆ PROBABILITY STATISTICS") + print(f" Average confidence: {stats['avg_probability']:.3f}") + print(f" Min confidence: {stats['min_probability']:.3f}") + print(f" Max confidence: {stats['max_probability']:.3f}") + + # Geographic statistics + if 'geographic_bounds' in stats: + bounds = stats['geographic_bounds'] + print(f"\nšŸŒ GEOGRAPHIC STATISTICS") + print(f" Northern bound: {bounds['north']:.6f}°") + print(f" Southern bound: {bounds['south']:.6f}°") + print(f" Eastern bound: {bounds['east']:.6f}°") + print(f" Western bound: {bounds['west']:.6f}°") + if 'max_distance_km' in stats: + print(f" Max distance span: {stats['max_distance_km']:.1f} km") + + # Top semantic types + if stats['semantic_types']: + print(f"\nšŸ·ļø TOP LOCATION TYPES") + for semantic_type, count in stats['semantic_types'].most_common(10): + percentage = (count / stats['visits']) * 100 if stats['visits'] > 0 else 0 + print(f" {semantic_type:<15}: {count:,} ({percentage:.1f}%)") + + # Top places + if stats['place_ids']: + print(f"\nšŸ“ TOP VISITED PLACES") + for i, (place_id, count) in enumerate(stats['place_ids'].most_common(5)): + percentage = (count / stats['visits']) * 100 if stats['visits'] > 0 else 0 + print(f" #{i+1:<2} {place_id[:30]:<30}: {count:,} visits ({percentage:.1f}%)") + + # Yearly activity + if stats['yearly_activity']: + print(f"\nšŸ“ˆ ACTIVITY BY YEAR") + for year in sorted(stats['yearly_activity'].keys()): + count = stats['yearly_activity'][year] + print(f" {year}: {count:,} segments") + + # Daily patterns + if stats['daily_activity']: + print(f"\nšŸ“† ACTIVITY BY DAY OF WEEK") + days_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] + for day in days_order: + count = stats['daily_activity'].get(day, 0) + if count > 0: + print(f" {day:<10}: {count:,} segments") + + # Monthly activity (show recent 12 months) + if stats['monthly_activity']: + print(f"\nšŸ“Š RECENT MONTHLY ACTIVITY") + sorted_months = sorted(stats['monthly_activity'].keys())[-12:] + for month in sorted_months: + count = stats['monthly_activity'][month] + print(f" {month}: {count:,} segments") + +def main(): + """Main function to run the timeline analysis.""" + # Get the script directory and find the repo root + script_dir = os.path.dirname(os.path.abspath(__file__)) + repo_root = os.path.dirname(script_dir) + + # Look for Timeline.json starting from repo root + timeline_path = find_timeline_json(repo_root) + + if not timeline_path: + print("āŒ Timeline.json file not found!") + print(f"Searched in: {repo_root}") + sys.exit(1) + + try: + # Analyze the data + stats = analyze_timeline_data(timeline_path) + + # Print the results + print_statistics(stats) + + print(f"\nāœ… Analysis complete! File analyzed: {timeline_path}") + + except FileNotFoundError: + print(f"āŒ Error: Could not find Timeline.json at {timeline_path}") + sys.exit(1) + except json.JSONDecodeError as e: + print(f"āŒ Error: Invalid JSON format in Timeline.json - {e}") + sys.exit(1) + except Exception as e: + print(f"āŒ Error analyzing timeline data: {e}") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file