Files
personal-tracker/scripts/overview/data-overview.py

446 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Timeline Data Overview Script
This script analyzes the Timeline.json file from Google Location History data
and provides comprehensive statistics about the tracked location data.
"""
import json
import os
import sys
from datetime import datetime, timedelta
from collections import Counter, defaultdict
from typing import Dict, List, Any
import math
from io import StringIO
def find_timeline_json(start_path: str) -> str:
"""Find the Timeline.json file starting from the given path."""
for root, dirs, files in os.walk(start_path):
if 'Timeline.json' in files:
return os.path.join(root, 'Timeline.json')
return None
def parse_datetime(timestamp: str) -> datetime:
"""Parse ISO format timestamp to datetime object."""
try:
# Handle timestamps with timezone info
if timestamp.endswith('Z'):
timestamp = timestamp[:-1] + '+00:00'
return datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
except ValueError:
# Fallback for different formats
try:
return datetime.fromisoformat(timestamp)
except ValueError:
return None
def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate distance between two points using Haversine formula (in kilometers)."""
R = 6371 # Earth's radius in km
lat1_rad = math.radians(lat1)
lat2_rad = math.radians(lat2)
delta_lat = math.radians(lat2 - lat1)
delta_lon = math.radians(lon2 - lon1)
a = (math.sin(delta_lat / 2) ** 2 +
math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(delta_lon / 2) ** 2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return R * c
def parse_coordinates(point_str: str) -> tuple:
"""Parse coordinate string like '51.6659027°, -0.4058773°' to lat, lon."""
try:
coords = point_str.replace('°', '').split(', ')
return float(coords[0]), float(coords[1])
except (ValueError, IndexError):
return None, None
def analyze_timeline_data(timeline_path: str) -> Dict[str, Any]:
"""Analyze the timeline data and return comprehensive statistics."""
print(f"Loading timeline data from: {timeline_path}")
print("This may take a moment for large files...")
with open(timeline_path, 'r', encoding='utf-8') as f:
data = json.load(f)
semantic_segments = data.get('semanticSegments', [])
# Initialize counters and collections
stats = {
'total_segments': len(semantic_segments),
'visits': 0,
'timeline_paths': 0,
'semantic_types': Counter(),
'place_ids': Counter(),
'monthly_activity': defaultdict(int),
'yearly_activity': defaultdict(int),
'daily_activity': defaultdict(int),
'total_duration_hours': 0,
'date_range': {'earliest': None, 'latest': None},
'locations': [],
'probabilities': [],
}
print("Analyzing segments...")
for i, segment in enumerate(semantic_segments):
if i % 10000 == 0 and i > 0:
print(f"Processed {i:,} segments...")
# Parse timestamps
start_time = parse_datetime(segment.get('startTime', ''))
end_time = parse_datetime(segment.get('endTime', ''))
if start_time:
# Update date range
if stats['date_range']['earliest'] is None or start_time < stats['date_range']['earliest']:
stats['date_range']['earliest'] = start_time
if stats['date_range']['latest'] is None or start_time > stats['date_range']['latest']:
stats['date_range']['latest'] = start_time
# Monthly and yearly activity
month_key = start_time.strftime('%Y-%m')
year_key = start_time.strftime('%Y')
day_key = start_time.strftime('%A')
stats['monthly_activity'][month_key] += 1
stats['yearly_activity'][year_key] += 1
stats['daily_activity'][day_key] += 1
# Calculate duration
if start_time and end_time:
duration = end_time - start_time
stats['total_duration_hours'] += duration.total_seconds() / 3600
# Analyze visits
if 'visit' in segment:
stats['visits'] += 1
visit = segment['visit']
# Probability analysis
if 'probability' in visit:
stats['probabilities'].append(visit['probability'])
# Semantic types
top_candidate = visit.get('topCandidate', {})
if 'semanticType' in top_candidate:
semantic_type = top_candidate['semanticType']
stats['semantic_types'][semantic_type] += 1
# Place IDs
if 'placeId' in top_candidate:
place_id = top_candidate['placeId']
stats['place_ids'][place_id] += 1
# Location coordinates
place_location = top_candidate.get('placeLocation', {})
if 'latLng' in place_location:
lat, lon = parse_coordinates(place_location['latLng'])
if lat is not None and lon is not None:
stats['locations'].append((lat, lon))
# Analyze timeline paths
if 'timelinePath' in segment:
stats['timeline_paths'] += 1
# Calculate additional statistics
if stats['probabilities']:
stats['avg_probability'] = sum(stats['probabilities']) / len(stats['probabilities'])
stats['min_probability'] = min(stats['probabilities'])
stats['max_probability'] = max(stats['probabilities'])
if stats['locations']:
# Find geographic bounds
lats = [loc[0] for loc in stats['locations']]
lons = [loc[1] for loc in stats['locations']]
stats['geographic_bounds'] = {
'north': max(lats),
'south': min(lats),
'east': max(lons),
'west': min(lons)
}
# Calculate approximate geographic span
if len(stats['locations']) > 1:
max_distance = 0
for i in range(0, min(len(stats['locations']), 1000), 10): # Sample for performance
for j in range(i + 1, min(len(stats['locations']), 1000), 10):
dist = calculate_distance(
stats['locations'][i][0], stats['locations'][i][1],
stats['locations'][j][0], stats['locations'][j][1]
)
max_distance = max(max_distance, dist)
stats['max_distance_km'] = max_distance
return stats
def export_statistics_to_file(stats: Dict[str, Any], output_path: str):
"""Export comprehensive statistics to a text file."""
with open(output_path, 'w', encoding='utf-8') as f:
# Redirect print statements to file
original_stdout = sys.stdout
sys.stdout = f
print_statistics(stats)
# Restore stdout
sys.stdout = original_stdout
def analyze_json_structure(timeline_path: str, output_path: str):
"""Analyze and export the JSON structure to a text file."""
print(f"Analyzing JSON structure from: {timeline_path}")
with open(timeline_path, 'r', encoding='utf-8') as f:
data = json.load(f)
def explore_structure(obj, path="", depth=0, max_depth=4):
"""Recursively explore JSON structure."""
indent = " " * depth
structure_info = []
if depth > max_depth:
return [f"{indent}... (max depth reached)"]
if isinstance(obj, dict):
structure_info.append(f"{indent}{path} (dict) - {len(obj)} keys:")
for key, value in list(obj.items())[:10]: # Limit to first 10 keys
key_path = f"{path}.{key}" if path else key
if isinstance(value, (dict, list)):
structure_info.extend(explore_structure(value, key_path, depth + 1, max_depth))
else:
value_type = type(value).__name__
if isinstance(value, str) and len(value) > 50:
sample = value[:50] + "..."
else:
sample = str(value)
structure_info.append(f"{indent} {key}: {value_type} = {sample}")
if len(obj) > 10:
structure_info.append(f"{indent} ... and {len(obj) - 10} more keys")
elif isinstance(obj, list):
structure_info.append(f"{indent}{path} (list) - {len(obj)} items:")
if obj:
structure_info.append(f"{indent} Sample item structure:")
structure_info.extend(explore_structure(obj[0], f"{path}[0]", depth + 1, max_depth))
if len(obj) > 1:
structure_info.append(f"{indent} ... and {len(obj) - 1} more items")
else:
value_type = type(obj).__name__
structure_info.append(f"{indent}{path}: {value_type} = {obj}")
return structure_info
with open(output_path, 'w', encoding='utf-8') as f:
f.write("="*80 + "\n")
f.write("TIMELINE JSON STRUCTURE ANALYSIS\n")
f.write("="*80 + "\n\n")
f.write(f"File: {timeline_path}\n")
f.write(f"Analysis Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
# Overall structure
f.write("ROOT LEVEL STRUCTURE:\n")
f.write("-" * 40 + "\n")
for key, value in data.items():
if isinstance(value, list):
f.write(f"{key}: list with {len(value)} items\n")
elif isinstance(value, dict):
f.write(f"{key}: dict with {len(value)} keys\n")
else:
f.write(f"{key}: {type(value).__name__} = {value}\n")
f.write("\n" + "="*80 + "\n")
f.write("DETAILED STRUCTURE:\n")
f.write("="*80 + "\n\n")
# Detailed structure analysis
structure_lines = explore_structure(data)
for line in structure_lines:
f.write(line + "\n")
# Sample semantic segment analysis
semantic_segments = data.get('semanticSegments', [])
if semantic_segments:
f.write("\n" + "="*80 + "\n")
f.write("SEMANTIC SEGMENTS ANALYSIS:\n")
f.write("="*80 + "\n\n")
f.write(f"Total semantic segments: {len(semantic_segments)}\n\n")
# Analyze different types of segments
visit_count = sum(1 for seg in semantic_segments if 'visit' in seg)
path_count = sum(1 for seg in semantic_segments if 'timelinePath' in seg)
f.write(f"Segments with visits: {visit_count}\n")
f.write(f"Segments with timeline paths: {path_count}\n\n")
# Sample visit structure
sample_visit = None
sample_path = None
for segment in semantic_segments[:100]: # Check first 100 segments
if 'visit' in segment and sample_visit is None:
sample_visit = segment
if 'timelinePath' in segment and sample_path is None:
sample_path = segment
if sample_visit and sample_path:
break
if sample_visit:
f.write("SAMPLE VISIT STRUCTURE:\n")
f.write("-" * 40 + "\n")
visit_structure = explore_structure(sample_visit, "sample_visit")
for line in visit_structure:
f.write(line + "\n")
f.write("\n")
if sample_path:
f.write("SAMPLE TIMELINE PATH STRUCTURE:\n")
f.write("-" * 40 + "\n")
path_structure = explore_structure(sample_path, "sample_timelinePath")
for line in path_structure:
f.write(line + "\n")
def print_statistics(stats: Dict[str, Any]):
"""Print comprehensive statistics in a readable format."""
print("\n" + "="*80)
print("TIMELINE DATA OVERVIEW")
print("="*80)
# Basic counts
print(f"\n📊 BASIC STATISTICS")
print(f" Total segments: {stats['total_segments']:,}")
print(f" Visits: {stats['visits']:,}")
print(f" Timeline paths: {stats['timeline_paths']:,}")
print(f" Unique places: {len(stats['place_ids']):,}")
# Date range
if stats['date_range']['earliest'] and stats['date_range']['latest']:
earliest = stats['date_range']['earliest']
latest = stats['date_range']['latest']
total_days = (latest - earliest).days
print(f"\n📅 DATE RANGE")
print(f" Earliest record: {earliest.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Latest record: {latest.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Total span: {total_days:,} days ({total_days/365.25:.1f} years)")
# Duration statistics
if stats['total_duration_hours'] > 0:
print(f"\n⏰ DURATION STATISTICS")
print(f" Total tracked time: {stats['total_duration_hours']:,.1f} hours")
print(f" Average per day: {stats['total_duration_hours'] / max(total_days, 1):.1f} hours")
# Probability statistics
if 'avg_probability' in stats:
print(f"\n🎯 PROBABILITY STATISTICS")
print(f" Average confidence: {stats['avg_probability']:.3f}")
print(f" Min confidence: {stats['min_probability']:.3f}")
print(f" Max confidence: {stats['max_probability']:.3f}")
# Geographic statistics
if 'geographic_bounds' in stats:
bounds = stats['geographic_bounds']
print(f"\n🌍 GEOGRAPHIC STATISTICS")
print(f" Northern bound: {bounds['north']:.6f}°")
print(f" Southern bound: {bounds['south']:.6f}°")
print(f" Eastern bound: {bounds['east']:.6f}°")
print(f" Western bound: {bounds['west']:.6f}°")
if 'max_distance_km' in stats:
print(f" Max distance span: {stats['max_distance_km']:.1f} km")
# Top semantic types
if stats['semantic_types']:
print(f"\n🏷️ TOP LOCATION TYPES")
for semantic_type, count in stats['semantic_types'].most_common(10):
percentage = (count / stats['visits']) * 100 if stats['visits'] > 0 else 0
print(f" {semantic_type:<15}: {count:,} ({percentage:.1f}%)")
# Top places
if stats['place_ids']:
print(f"\n📍 TOP VISITED PLACES")
for i, (place_id, count) in enumerate(stats['place_ids'].most_common(5)):
percentage = (count / stats['visits']) * 100 if stats['visits'] > 0 else 0
print(f" #{i+1:<2} {place_id[:30]:<30}: {count:,} visits ({percentage:.1f}%)")
# Yearly activity
if stats['yearly_activity']:
print(f"\n📈 ACTIVITY BY YEAR")
for year in sorted(stats['yearly_activity'].keys()):
count = stats['yearly_activity'][year]
print(f" {year}: {count:,} segments")
# Daily patterns
if stats['daily_activity']:
print(f"\n📆 ACTIVITY BY DAY OF WEEK")
days_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
for day in days_order:
count = stats['daily_activity'].get(day, 0)
if count > 0:
print(f" {day:<10}: {count:,} segments")
# Monthly activity (show recent 12 months)
if stats['monthly_activity']:
print(f"\n📊 RECENT MONTHLY ACTIVITY")
sorted_months = sorted(stats['monthly_activity'].keys())[-12:]
for month in sorted_months:
count = stats['monthly_activity'][month]
print(f" {month}: {count:,} segments")
def main():
"""Main function to run the timeline analysis."""
# Get the script directory and find the repo root
script_dir = os.path.dirname(os.path.abspath(__file__))
repo_root = os.path.dirname(script_dir)
# Look for Timeline.json starting from repo root
timeline_path = find_timeline_json(repo_root)
if not timeline_path:
print("❌ Timeline.json file not found!")
print(f"Searched in: {repo_root}")
sys.exit(1)
# Generate output file names with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
stats_output_path = os.path.join(repo_root, f"timeline_statistics_{timestamp}.txt")
structure_output_path = os.path.join(repo_root, f"timeline_structure_{timestamp}.txt")
try:
# Analyze the JSON structure first
print("📋 Analyzing JSON structure...")
analyze_json_structure(timeline_path, structure_output_path)
print(f"✅ JSON structure exported to: {structure_output_path}")
# Analyze the data
stats = analyze_timeline_data(timeline_path)
# Print the results to console
print_statistics(stats)
# Export statistics to file
print(f"\n📄 Exporting statistics to file...")
export_statistics_to_file(stats, stats_output_path)
print(f"✅ Statistics exported to: {stats_output_path}")
print(f"\n🎉 Analysis complete!")
print(f"📊 Statistics file: {stats_output_path}")
print(f"🏗️ Structure file: {structure_output_path}")
print(f"📁 Source file: {timeline_path}")
except FileNotFoundError:
print(f"❌ Error: Could not find Timeline.json at {timeline_path}")
sys.exit(1)
except json.JSONDecodeError as e:
print(f"❌ Error: Invalid JSON format in Timeline.json - {e}")
sys.exit(1)
except Exception as e:
print(f"❌ Error analyzing timeline data: {e}")
sys.exit(1)
if __name__ == "__main__":
main()