#!/usr/bin/env python3 """ Timeline Data Overview Script This script analyzes the Timeline.json file from Google Location History data and provides comprehensive statistics about the tracked location data. """ import json import os import sys from datetime import datetime, timedelta from collections import Counter, defaultdict from typing import Dict, List, Any import math from io import StringIO def find_timeline_json(start_path: str) -> str: """Find the Timeline.json file starting from the given path.""" for root, dirs, files in os.walk(start_path): if 'Timeline.json' in files: return os.path.join(root, 'Timeline.json') return None def parse_datetime(timestamp: str) -> datetime: """Parse ISO format timestamp to datetime object.""" try: # Handle timestamps with timezone info if timestamp.endswith('Z'): timestamp = timestamp[:-1] + '+00:00' return datetime.fromisoformat(timestamp.replace('Z', '+00:00')) except ValueError: # Fallback for different formats try: return datetime.fromisoformat(timestamp) except ValueError: return None def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Calculate distance between two points using Haversine formula (in kilometers).""" R = 6371 # Earth's radius in km lat1_rad = math.radians(lat1) lat2_rad = math.radians(lat2) delta_lat = math.radians(lat2 - lat1) delta_lon = math.radians(lon2 - lon1) a = (math.sin(delta_lat / 2) ** 2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(delta_lon / 2) ** 2) c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) return R * c def parse_coordinates(point_str: str) -> tuple: """Parse coordinate string like '51.6659027°, -0.4058773°' to lat, lon.""" try: coords = point_str.replace('°', '').split(', ') return float(coords[0]), float(coords[1]) except (ValueError, IndexError): return None, None def analyze_timeline_data(timeline_path: str) -> Dict[str, Any]: """Analyze the timeline data and return comprehensive statistics.""" print(f"Loading timeline data from: {timeline_path}") print("This may take a moment for large files...") with open(timeline_path, 'r', encoding='utf-8') as f: data = json.load(f) semantic_segments = data.get('semanticSegments', []) # Initialize counters and collections stats = { 'total_segments': len(semantic_segments), 'visits': 0, 'timeline_paths': 0, 'semantic_types': Counter(), 'place_ids': Counter(), 'monthly_activity': defaultdict(int), 'yearly_activity': defaultdict(int), 'daily_activity': defaultdict(int), 'total_duration_hours': 0, 'date_range': {'earliest': None, 'latest': None}, 'locations': [], 'probabilities': [], } print("Analyzing segments...") for i, segment in enumerate(semantic_segments): if i % 10000 == 0 and i > 0: print(f"Processed {i:,} segments...") # Parse timestamps start_time = parse_datetime(segment.get('startTime', '')) end_time = parse_datetime(segment.get('endTime', '')) if start_time: # Update date range if stats['date_range']['earliest'] is None or start_time < stats['date_range']['earliest']: stats['date_range']['earliest'] = start_time if stats['date_range']['latest'] is None or start_time > stats['date_range']['latest']: stats['date_range']['latest'] = start_time # Monthly and yearly activity month_key = start_time.strftime('%Y-%m') year_key = start_time.strftime('%Y') day_key = start_time.strftime('%A') stats['monthly_activity'][month_key] += 1 stats['yearly_activity'][year_key] += 1 stats['daily_activity'][day_key] += 1 # Calculate duration if start_time and end_time: duration = end_time - start_time stats['total_duration_hours'] += duration.total_seconds() / 3600 # Analyze visits if 'visit' in segment: stats['visits'] += 1 visit = segment['visit'] # Probability analysis if 'probability' in visit: stats['probabilities'].append(visit['probability']) # Semantic types top_candidate = visit.get('topCandidate', {}) if 'semanticType' in top_candidate: semantic_type = top_candidate['semanticType'] stats['semantic_types'][semantic_type] += 1 # Place IDs if 'placeId' in top_candidate: place_id = top_candidate['placeId'] stats['place_ids'][place_id] += 1 # Location coordinates place_location = top_candidate.get('placeLocation', {}) if 'latLng' in place_location: lat, lon = parse_coordinates(place_location['latLng']) if lat is not None and lon is not None: stats['locations'].append((lat, lon)) # Analyze timeline paths if 'timelinePath' in segment: stats['timeline_paths'] += 1 # Calculate additional statistics if stats['probabilities']: stats['avg_probability'] = sum(stats['probabilities']) / len(stats['probabilities']) stats['min_probability'] = min(stats['probabilities']) stats['max_probability'] = max(stats['probabilities']) if stats['locations']: # Find geographic bounds lats = [loc[0] for loc in stats['locations']] lons = [loc[1] for loc in stats['locations']] stats['geographic_bounds'] = { 'north': max(lats), 'south': min(lats), 'east': max(lons), 'west': min(lons) } # Calculate approximate geographic span if len(stats['locations']) > 1: max_distance = 0 for i in range(0, min(len(stats['locations']), 1000), 10): # Sample for performance for j in range(i + 1, min(len(stats['locations']), 1000), 10): dist = calculate_distance( stats['locations'][i][0], stats['locations'][i][1], stats['locations'][j][0], stats['locations'][j][1] ) max_distance = max(max_distance, dist) stats['max_distance_km'] = max_distance return stats def export_statistics_to_file(stats: Dict[str, Any], output_path: str): """Export comprehensive statistics to a text file.""" with open(output_path, 'w', encoding='utf-8') as f: # Redirect print statements to file original_stdout = sys.stdout sys.stdout = f print_statistics(stats) # Restore stdout sys.stdout = original_stdout def analyze_json_structure(timeline_path: str, output_path: str): """Analyze and export the JSON structure to a text file.""" print(f"Analyzing JSON structure from: {timeline_path}") with open(timeline_path, 'r', encoding='utf-8') as f: data = json.load(f) def explore_structure(obj, path="", depth=0, max_depth=4): """Recursively explore JSON structure.""" indent = " " * depth structure_info = [] if depth > max_depth: return [f"{indent}... (max depth reached)"] if isinstance(obj, dict): structure_info.append(f"{indent}{path} (dict) - {len(obj)} keys:") for key, value in list(obj.items())[:10]: # Limit to first 10 keys key_path = f"{path}.{key}" if path else key if isinstance(value, (dict, list)): structure_info.extend(explore_structure(value, key_path, depth + 1, max_depth)) else: value_type = type(value).__name__ if isinstance(value, str) and len(value) > 50: sample = value[:50] + "..." else: sample = str(value) structure_info.append(f"{indent} {key}: {value_type} = {sample}") if len(obj) > 10: structure_info.append(f"{indent} ... and {len(obj) - 10} more keys") elif isinstance(obj, list): structure_info.append(f"{indent}{path} (list) - {len(obj)} items:") if obj: structure_info.append(f"{indent} Sample item structure:") structure_info.extend(explore_structure(obj[0], f"{path}[0]", depth + 1, max_depth)) if len(obj) > 1: structure_info.append(f"{indent} ... and {len(obj) - 1} more items") else: value_type = type(obj).__name__ structure_info.append(f"{indent}{path}: {value_type} = {obj}") return structure_info with open(output_path, 'w', encoding='utf-8') as f: f.write("="*80 + "\n") f.write("TIMELINE JSON STRUCTURE ANALYSIS\n") f.write("="*80 + "\n\n") f.write(f"File: {timeline_path}\n") f.write(f"Analysis Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") # Overall structure f.write("ROOT LEVEL STRUCTURE:\n") f.write("-" * 40 + "\n") for key, value in data.items(): if isinstance(value, list): f.write(f"{key}: list with {len(value)} items\n") elif isinstance(value, dict): f.write(f"{key}: dict with {len(value)} keys\n") else: f.write(f"{key}: {type(value).__name__} = {value}\n") f.write("\n" + "="*80 + "\n") f.write("DETAILED STRUCTURE:\n") f.write("="*80 + "\n\n") # Detailed structure analysis structure_lines = explore_structure(data) for line in structure_lines: f.write(line + "\n") # Sample semantic segment analysis semantic_segments = data.get('semanticSegments', []) if semantic_segments: f.write("\n" + "="*80 + "\n") f.write("SEMANTIC SEGMENTS ANALYSIS:\n") f.write("="*80 + "\n\n") f.write(f"Total semantic segments: {len(semantic_segments)}\n\n") # Analyze different types of segments visit_count = sum(1 for seg in semantic_segments if 'visit' in seg) path_count = sum(1 for seg in semantic_segments if 'timelinePath' in seg) f.write(f"Segments with visits: {visit_count}\n") f.write(f"Segments with timeline paths: {path_count}\n\n") # Sample visit structure sample_visit = None sample_path = None for segment in semantic_segments[:100]: # Check first 100 segments if 'visit' in segment and sample_visit is None: sample_visit = segment if 'timelinePath' in segment and sample_path is None: sample_path = segment if sample_visit and sample_path: break if sample_visit: f.write("SAMPLE VISIT STRUCTURE:\n") f.write("-" * 40 + "\n") visit_structure = explore_structure(sample_visit, "sample_visit") for line in visit_structure: f.write(line + "\n") f.write("\n") if sample_path: f.write("SAMPLE TIMELINE PATH STRUCTURE:\n") f.write("-" * 40 + "\n") path_structure = explore_structure(sample_path, "sample_timelinePath") for line in path_structure: f.write(line + "\n") def print_statistics(stats: Dict[str, Any]): """Print comprehensive statistics in a readable format.""" print("\n" + "="*80) print("TIMELINE DATA OVERVIEW") print("="*80) # Basic counts print(f"\nšŸ“Š BASIC STATISTICS") print(f" Total segments: {stats['total_segments']:,}") print(f" Visits: {stats['visits']:,}") print(f" Timeline paths: {stats['timeline_paths']:,}") print(f" Unique places: {len(stats['place_ids']):,}") # Date range if stats['date_range']['earliest'] and stats['date_range']['latest']: earliest = stats['date_range']['earliest'] latest = stats['date_range']['latest'] total_days = (latest - earliest).days print(f"\nšŸ“… DATE RANGE") print(f" Earliest record: {earliest.strftime('%Y-%m-%d %H:%M:%S')}") print(f" Latest record: {latest.strftime('%Y-%m-%d %H:%M:%S')}") print(f" Total span: {total_days:,} days ({total_days/365.25:.1f} years)") # Duration statistics if stats['total_duration_hours'] > 0: print(f"\nā° DURATION STATISTICS") print(f" Total tracked time: {stats['total_duration_hours']:,.1f} hours") print(f" Average per day: {stats['total_duration_hours'] / max(total_days, 1):.1f} hours") # Probability statistics if 'avg_probability' in stats: print(f"\nšŸŽÆ PROBABILITY STATISTICS") print(f" Average confidence: {stats['avg_probability']:.3f}") print(f" Min confidence: {stats['min_probability']:.3f}") print(f" Max confidence: {stats['max_probability']:.3f}") # Geographic statistics if 'geographic_bounds' in stats: bounds = stats['geographic_bounds'] print(f"\nšŸŒ GEOGRAPHIC STATISTICS") print(f" Northern bound: {bounds['north']:.6f}°") print(f" Southern bound: {bounds['south']:.6f}°") print(f" Eastern bound: {bounds['east']:.6f}°") print(f" Western bound: {bounds['west']:.6f}°") if 'max_distance_km' in stats: print(f" Max distance span: {stats['max_distance_km']:.1f} km") # Top semantic types if stats['semantic_types']: print(f"\nšŸ·ļø TOP LOCATION TYPES") for semantic_type, count in stats['semantic_types'].most_common(10): percentage = (count / stats['visits']) * 100 if stats['visits'] > 0 else 0 print(f" {semantic_type:<15}: {count:,} ({percentage:.1f}%)") # Top places if stats['place_ids']: print(f"\nšŸ“ TOP VISITED PLACES") for i, (place_id, count) in enumerate(stats['place_ids'].most_common(5)): percentage = (count / stats['visits']) * 100 if stats['visits'] > 0 else 0 print(f" #{i+1:<2} {place_id[:30]:<30}: {count:,} visits ({percentage:.1f}%)") # Yearly activity if stats['yearly_activity']: print(f"\nšŸ“ˆ ACTIVITY BY YEAR") for year in sorted(stats['yearly_activity'].keys()): count = stats['yearly_activity'][year] print(f" {year}: {count:,} segments") # Daily patterns if stats['daily_activity']: print(f"\nšŸ“† ACTIVITY BY DAY OF WEEK") days_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] for day in days_order: count = stats['daily_activity'].get(day, 0) if count > 0: print(f" {day:<10}: {count:,} segments") # Monthly activity (show recent 12 months) if stats['monthly_activity']: print(f"\nšŸ“Š RECENT MONTHLY ACTIVITY") sorted_months = sorted(stats['monthly_activity'].keys())[-12:] for month in sorted_months: count = stats['monthly_activity'][month] print(f" {month}: {count:,} segments") def main(): """Main function to run the timeline analysis.""" # Get the script directory and find the repo root script_dir = os.path.dirname(os.path.abspath(__file__)) repo_root = os.path.dirname(script_dir) # Look for Timeline.json starting from repo root timeline_path = find_timeline_json(repo_root) if not timeline_path: print("āŒ Timeline.json file not found!") print(f"Searched in: {repo_root}") sys.exit(1) # Generate output file names with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") stats_output_path = os.path.join(repo_root, f"timeline_statistics_{timestamp}.txt") structure_output_path = os.path.join(repo_root, f"timeline_structure_{timestamp}.txt") try: # Analyze the JSON structure first print("šŸ“‹ Analyzing JSON structure...") analyze_json_structure(timeline_path, structure_output_path) print(f"āœ… JSON structure exported to: {structure_output_path}") # Analyze the data stats = analyze_timeline_data(timeline_path) # Print the results to console print_statistics(stats) # Export statistics to file print(f"\nšŸ“„ Exporting statistics to file...") export_statistics_to_file(stats, stats_output_path) print(f"āœ… Statistics exported to: {stats_output_path}") print(f"\nšŸŽ‰ Analysis complete!") print(f"šŸ“Š Statistics file: {stats_output_path}") print(f"šŸ—ļø Structure file: {structure_output_path}") print(f"šŸ“ Source file: {timeline_path}") except FileNotFoundError: print(f"āŒ Error: Could not find Timeline.json at {timeline_path}") sys.exit(1) except json.JSONDecodeError as e: print(f"āŒ Error: Invalid JSON format in Timeline.json - {e}") sys.exit(1) except Exception as e: print(f"āŒ Error analyzing timeline data: {e}") sys.exit(1) if __name__ == "__main__": main()