Files
personal-tracker/scripts/data-overview.py

300 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Timeline Data Overview Script
This script analyzes the Timeline.json file from Google Location History data
and provides comprehensive statistics about the tracked location data.
"""
import json
import os
import sys
from datetime import datetime, timedelta
from collections import Counter, defaultdict
from typing import Dict, List, Any
import math
from io import StringIO
def find_timeline_json(start_path: str) -> str:
"""Find the Timeline.json file starting from the given path."""
for root, dirs, files in os.walk(start_path):
if 'Timeline.json' in files:
return os.path.join(root, 'Timeline.json')
return None
def parse_datetime(timestamp: str) -> datetime:
"""Parse ISO format timestamp to datetime object."""
try:
# Handle timestamps with timezone info
if timestamp.endswith('Z'):
timestamp = timestamp[:-1] + '+00:00'
return datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
except ValueError:
# Fallback for different formats
try:
return datetime.fromisoformat(timestamp)
except ValueError:
return None
def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Calculate distance between two points using Haversine formula (in kilometers)."""
R = 6371 # Earth's radius in km
lat1_rad = math.radians(lat1)
lat2_rad = math.radians(lat2)
delta_lat = math.radians(lat2 - lat1)
delta_lon = math.radians(lon2 - lon1)
a = (math.sin(delta_lat / 2) ** 2 +
math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(delta_lon / 2) ** 2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return R * c
def parse_coordinates(point_str: str) -> tuple:
"""Parse coordinate string like '51.6659027°, -0.4058773°' to lat, lon."""
try:
coords = point_str.replace('°', '').split(', ')
return float(coords[0]), float(coords[1])
except (ValueError, IndexError):
return None, None
def analyze_timeline_data(timeline_path: str) -> Dict[str, Any]:
"""Analyze the timeline data and return comprehensive statistics."""
print(f"Loading timeline data from: {timeline_path}")
print("This may take a moment for large files...")
with open(timeline_path, 'r', encoding='utf-8') as f:
data = json.load(f)
semantic_segments = data.get('semanticSegments', [])
# Initialize counters and collections
stats = {
'total_segments': len(semantic_segments),
'visits': 0,
'timeline_paths': 0,
'semantic_types': Counter(),
'place_ids': Counter(),
'monthly_activity': defaultdict(int),
'yearly_activity': defaultdict(int),
'daily_activity': defaultdict(int),
'total_duration_hours': 0,
'date_range': {'earliest': None, 'latest': None},
'locations': [],
'probabilities': [],
}
print("Analyzing segments...")
for i, segment in enumerate(semantic_segments):
if i % 10000 == 0 and i > 0:
print(f"Processed {i:,} segments...")
# Parse timestamps
start_time = parse_datetime(segment.get('startTime', ''))
end_time = parse_datetime(segment.get('endTime', ''))
if start_time:
# Update date range
if stats['date_range']['earliest'] is None or start_time < stats['date_range']['earliest']:
stats['date_range']['earliest'] = start_time
if stats['date_range']['latest'] is None or start_time > stats['date_range']['latest']:
stats['date_range']['latest'] = start_time
# Monthly and yearly activity
month_key = start_time.strftime('%Y-%m')
year_key = start_time.strftime('%Y')
day_key = start_time.strftime('%A')
stats['monthly_activity'][month_key] += 1
stats['yearly_activity'][year_key] += 1
stats['daily_activity'][day_key] += 1
# Calculate duration
if start_time and end_time:
duration = end_time - start_time
stats['total_duration_hours'] += duration.total_seconds() / 3600
# Analyze visits
if 'visit' in segment:
stats['visits'] += 1
visit = segment['visit']
# Probability analysis
if 'probability' in visit:
stats['probabilities'].append(visit['probability'])
# Semantic types
top_candidate = visit.get('topCandidate', {})
if 'semanticType' in top_candidate:
semantic_type = top_candidate['semanticType']
stats['semantic_types'][semantic_type] += 1
# Place IDs
if 'placeId' in top_candidate:
place_id = top_candidate['placeId']
stats['place_ids'][place_id] += 1
# Location coordinates
place_location = top_candidate.get('placeLocation', {})
if 'latLng' in place_location:
lat, lon = parse_coordinates(place_location['latLng'])
if lat is not None and lon is not None:
stats['locations'].append((lat, lon))
# Analyze timeline paths
if 'timelinePath' in segment:
stats['timeline_paths'] += 1
# Calculate additional statistics
if stats['probabilities']:
stats['avg_probability'] = sum(stats['probabilities']) / len(stats['probabilities'])
stats['min_probability'] = min(stats['probabilities'])
stats['max_probability'] = max(stats['probabilities'])
if stats['locations']:
# Find geographic bounds
lats = [loc[0] for loc in stats['locations']]
lons = [loc[1] for loc in stats['locations']]
stats['geographic_bounds'] = {
'north': max(lats),
'south': min(lats),
'east': max(lons),
'west': min(lons)
}
# Calculate approximate geographic span
if len(stats['locations']) > 1:
max_distance = 0
for i in range(0, min(len(stats['locations']), 1000), 10): # Sample for performance
for j in range(i + 1, min(len(stats['locations']), 1000), 10):
dist = calculate_distance(
stats['locations'][i][0], stats['locations'][i][1],
stats['locations'][j][0], stats['locations'][j][1]
)
max_distance = max(max_distance, dist)
stats['max_distance_km'] = max_distance
return stats
def print_statistics(stats: Dict[str, Any]):
"""Print comprehensive statistics in a readable format."""
print("\n" + "="*80)
print("TIMELINE DATA OVERVIEW")
print("="*80)
# Basic counts
print(f"\n📊 BASIC STATISTICS")
print(f" Total segments: {stats['total_segments']:,}")
print(f" Visits: {stats['visits']:,}")
print(f" Timeline paths: {stats['timeline_paths']:,}")
print(f" Unique places: {len(stats['place_ids']):,}")
# Date range
if stats['date_range']['earliest'] and stats['date_range']['latest']:
earliest = stats['date_range']['earliest']
latest = stats['date_range']['latest']
total_days = (latest - earliest).days
print(f"\n📅 DATE RANGE")
print(f" Earliest record: {earliest.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Latest record: {latest.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Total span: {total_days:,} days ({total_days/365.25:.1f} years)")
# Duration statistics
if stats['total_duration_hours'] > 0:
print(f"\n⏰ DURATION STATISTICS")
print(f" Total tracked time: {stats['total_duration_hours']:,.1f} hours")
print(f" Average per day: {stats['total_duration_hours'] / max(total_days, 1):.1f} hours")
# Probability statistics
if 'avg_probability' in stats:
print(f"\n🎯 PROBABILITY STATISTICS")
print(f" Average confidence: {stats['avg_probability']:.3f}")
print(f" Min confidence: {stats['min_probability']:.3f}")
print(f" Max confidence: {stats['max_probability']:.3f}")
# Geographic statistics
if 'geographic_bounds' in stats:
bounds = stats['geographic_bounds']
print(f"\n🌍 GEOGRAPHIC STATISTICS")
print(f" Northern bound: {bounds['north']:.6f}°")
print(f" Southern bound: {bounds['south']:.6f}°")
print(f" Eastern bound: {bounds['east']:.6f}°")
print(f" Western bound: {bounds['west']:.6f}°")
if 'max_distance_km' in stats:
print(f" Max distance span: {stats['max_distance_km']:.1f} km")
# Top semantic types
if stats['semantic_types']:
print(f"\n🏷️ TOP LOCATION TYPES")
for semantic_type, count in stats['semantic_types'].most_common(10):
percentage = (count / stats['visits']) * 100 if stats['visits'] > 0 else 0
print(f" {semantic_type:<15}: {count:,} ({percentage:.1f}%)")
# Top places
if stats['place_ids']:
print(f"\n📍 TOP VISITED PLACES")
for i, (place_id, count) in enumerate(stats['place_ids'].most_common(5)):
percentage = (count / stats['visits']) * 100 if stats['visits'] > 0 else 0
print(f" #{i+1:<2} {place_id[:30]:<30}: {count:,} visits ({percentage:.1f}%)")
# Yearly activity
if stats['yearly_activity']:
print(f"\n📈 ACTIVITY BY YEAR")
for year in sorted(stats['yearly_activity'].keys()):
count = stats['yearly_activity'][year]
print(f" {year}: {count:,} segments")
# Daily patterns
if stats['daily_activity']:
print(f"\n📆 ACTIVITY BY DAY OF WEEK")
days_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
for day in days_order:
count = stats['daily_activity'].get(day, 0)
if count > 0:
print(f" {day:<10}: {count:,} segments")
# Monthly activity (show recent 12 months)
if stats['monthly_activity']:
print(f"\n📊 RECENT MONTHLY ACTIVITY")
sorted_months = sorted(stats['monthly_activity'].keys())[-12:]
for month in sorted_months:
count = stats['monthly_activity'][month]
print(f" {month}: {count:,} segments")
def main():
"""Main function to run the timeline analysis."""
# Get the script directory and find the repo root
script_dir = os.path.dirname(os.path.abspath(__file__))
repo_root = os.path.dirname(script_dir)
# Look for Timeline.json starting from repo root
timeline_path = find_timeline_json(repo_root)
if not timeline_path:
print("❌ Timeline.json file not found!")
print(f"Searched in: {repo_root}")
sys.exit(1)
try:
# Analyze the data
stats = analyze_timeline_data(timeline_path)
# Print the results
print_statistics(stats)
print(f"\n✅ Analysis complete! File analyzed: {timeline_path}")
except FileNotFoundError:
print(f"❌ Error: Could not find Timeline.json at {timeline_path}")
sys.exit(1)
except json.JSONDecodeError as e:
print(f"❌ Error: Invalid JSON format in Timeline.json - {e}")
sys.exit(1)
except Exception as e:
print(f"❌ Error analyzing timeline data: {e}")
sys.exit(1)
if __name__ == "__main__":
main()