moved overview script
This commit is contained in:
446
scripts/overview/data-overview.py
Normal file
446
scripts/overview/data-overview.py
Normal file
@@ -0,0 +1,446 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Timeline Data Overview Script
|
||||
|
||||
This script analyzes the Timeline.json file from Google Location History data
|
||||
and provides comprehensive statistics about the tracked location data.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timedelta
|
||||
from collections import Counter, defaultdict
|
||||
from typing import Dict, List, Any
|
||||
import math
|
||||
from io import StringIO
|
||||
|
||||
def find_timeline_json(start_path: str) -> str:
|
||||
"""Find the Timeline.json file starting from the given path."""
|
||||
for root, dirs, files in os.walk(start_path):
|
||||
if 'Timeline.json' in files:
|
||||
return os.path.join(root, 'Timeline.json')
|
||||
return None
|
||||
|
||||
def parse_datetime(timestamp: str) -> datetime:
|
||||
"""Parse ISO format timestamp to datetime object."""
|
||||
try:
|
||||
# Handle timestamps with timezone info
|
||||
if timestamp.endswith('Z'):
|
||||
timestamp = timestamp[:-1] + '+00:00'
|
||||
return datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
|
||||
except ValueError:
|
||||
# Fallback for different formats
|
||||
try:
|
||||
return datetime.fromisoformat(timestamp)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def calculate_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
||||
"""Calculate distance between two points using Haversine formula (in kilometers)."""
|
||||
R = 6371 # Earth's radius in km
|
||||
|
||||
lat1_rad = math.radians(lat1)
|
||||
lat2_rad = math.radians(lat2)
|
||||
delta_lat = math.radians(lat2 - lat1)
|
||||
delta_lon = math.radians(lon2 - lon1)
|
||||
|
||||
a = (math.sin(delta_lat / 2) ** 2 +
|
||||
math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(delta_lon / 2) ** 2)
|
||||
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
|
||||
|
||||
return R * c
|
||||
|
||||
def parse_coordinates(point_str: str) -> tuple:
|
||||
"""Parse coordinate string like '51.6659027°, -0.4058773°' to lat, lon."""
|
||||
try:
|
||||
coords = point_str.replace('°', '').split(', ')
|
||||
return float(coords[0]), float(coords[1])
|
||||
except (ValueError, IndexError):
|
||||
return None, None
|
||||
|
||||
def analyze_timeline_data(timeline_path: str) -> Dict[str, Any]:
|
||||
"""Analyze the timeline data and return comprehensive statistics."""
|
||||
print(f"Loading timeline data from: {timeline_path}")
|
||||
print("This may take a moment for large files...")
|
||||
|
||||
with open(timeline_path, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
|
||||
semantic_segments = data.get('semanticSegments', [])
|
||||
|
||||
# Initialize counters and collections
|
||||
stats = {
|
||||
'total_segments': len(semantic_segments),
|
||||
'visits': 0,
|
||||
'timeline_paths': 0,
|
||||
'semantic_types': Counter(),
|
||||
'place_ids': Counter(),
|
||||
'monthly_activity': defaultdict(int),
|
||||
'yearly_activity': defaultdict(int),
|
||||
'daily_activity': defaultdict(int),
|
||||
'total_duration_hours': 0,
|
||||
'date_range': {'earliest': None, 'latest': None},
|
||||
'locations': [],
|
||||
'probabilities': [],
|
||||
}
|
||||
|
||||
print("Analyzing segments...")
|
||||
|
||||
for i, segment in enumerate(semantic_segments):
|
||||
if i % 10000 == 0 and i > 0:
|
||||
print(f"Processed {i:,} segments...")
|
||||
|
||||
# Parse timestamps
|
||||
start_time = parse_datetime(segment.get('startTime', ''))
|
||||
end_time = parse_datetime(segment.get('endTime', ''))
|
||||
|
||||
if start_time:
|
||||
# Update date range
|
||||
if stats['date_range']['earliest'] is None or start_time < stats['date_range']['earliest']:
|
||||
stats['date_range']['earliest'] = start_time
|
||||
if stats['date_range']['latest'] is None or start_time > stats['date_range']['latest']:
|
||||
stats['date_range']['latest'] = start_time
|
||||
|
||||
# Monthly and yearly activity
|
||||
month_key = start_time.strftime('%Y-%m')
|
||||
year_key = start_time.strftime('%Y')
|
||||
day_key = start_time.strftime('%A')
|
||||
|
||||
stats['monthly_activity'][month_key] += 1
|
||||
stats['yearly_activity'][year_key] += 1
|
||||
stats['daily_activity'][day_key] += 1
|
||||
|
||||
# Calculate duration
|
||||
if start_time and end_time:
|
||||
duration = end_time - start_time
|
||||
stats['total_duration_hours'] += duration.total_seconds() / 3600
|
||||
|
||||
# Analyze visits
|
||||
if 'visit' in segment:
|
||||
stats['visits'] += 1
|
||||
visit = segment['visit']
|
||||
|
||||
# Probability analysis
|
||||
if 'probability' in visit:
|
||||
stats['probabilities'].append(visit['probability'])
|
||||
|
||||
# Semantic types
|
||||
top_candidate = visit.get('topCandidate', {})
|
||||
if 'semanticType' in top_candidate:
|
||||
semantic_type = top_candidate['semanticType']
|
||||
stats['semantic_types'][semantic_type] += 1
|
||||
|
||||
# Place IDs
|
||||
if 'placeId' in top_candidate:
|
||||
place_id = top_candidate['placeId']
|
||||
stats['place_ids'][place_id] += 1
|
||||
|
||||
# Location coordinates
|
||||
place_location = top_candidate.get('placeLocation', {})
|
||||
if 'latLng' in place_location:
|
||||
lat, lon = parse_coordinates(place_location['latLng'])
|
||||
if lat is not None and lon is not None:
|
||||
stats['locations'].append((lat, lon))
|
||||
|
||||
# Analyze timeline paths
|
||||
if 'timelinePath' in segment:
|
||||
stats['timeline_paths'] += 1
|
||||
|
||||
# Calculate additional statistics
|
||||
if stats['probabilities']:
|
||||
stats['avg_probability'] = sum(stats['probabilities']) / len(stats['probabilities'])
|
||||
stats['min_probability'] = min(stats['probabilities'])
|
||||
stats['max_probability'] = max(stats['probabilities'])
|
||||
|
||||
if stats['locations']:
|
||||
# Find geographic bounds
|
||||
lats = [loc[0] for loc in stats['locations']]
|
||||
lons = [loc[1] for loc in stats['locations']]
|
||||
stats['geographic_bounds'] = {
|
||||
'north': max(lats),
|
||||
'south': min(lats),
|
||||
'east': max(lons),
|
||||
'west': min(lons)
|
||||
}
|
||||
|
||||
# Calculate approximate geographic span
|
||||
if len(stats['locations']) > 1:
|
||||
max_distance = 0
|
||||
for i in range(0, min(len(stats['locations']), 1000), 10): # Sample for performance
|
||||
for j in range(i + 1, min(len(stats['locations']), 1000), 10):
|
||||
dist = calculate_distance(
|
||||
stats['locations'][i][0], stats['locations'][i][1],
|
||||
stats['locations'][j][0], stats['locations'][j][1]
|
||||
)
|
||||
max_distance = max(max_distance, dist)
|
||||
stats['max_distance_km'] = max_distance
|
||||
|
||||
return stats
|
||||
|
||||
def export_statistics_to_file(stats: Dict[str, Any], output_path: str):
|
||||
"""Export comprehensive statistics to a text file."""
|
||||
with open(output_path, 'w', encoding='utf-8') as f:
|
||||
# Redirect print statements to file
|
||||
original_stdout = sys.stdout
|
||||
sys.stdout = f
|
||||
|
||||
print_statistics(stats)
|
||||
|
||||
# Restore stdout
|
||||
sys.stdout = original_stdout
|
||||
|
||||
def analyze_json_structure(timeline_path: str, output_path: str):
|
||||
"""Analyze and export the JSON structure to a text file."""
|
||||
print(f"Analyzing JSON structure from: {timeline_path}")
|
||||
|
||||
with open(timeline_path, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
|
||||
def explore_structure(obj, path="", depth=0, max_depth=4):
|
||||
"""Recursively explore JSON structure."""
|
||||
indent = " " * depth
|
||||
structure_info = []
|
||||
|
||||
if depth > max_depth:
|
||||
return [f"{indent}... (max depth reached)"]
|
||||
|
||||
if isinstance(obj, dict):
|
||||
structure_info.append(f"{indent}{path} (dict) - {len(obj)} keys:")
|
||||
for key, value in list(obj.items())[:10]: # Limit to first 10 keys
|
||||
key_path = f"{path}.{key}" if path else key
|
||||
if isinstance(value, (dict, list)):
|
||||
structure_info.extend(explore_structure(value, key_path, depth + 1, max_depth))
|
||||
else:
|
||||
value_type = type(value).__name__
|
||||
if isinstance(value, str) and len(value) > 50:
|
||||
sample = value[:50] + "..."
|
||||
else:
|
||||
sample = str(value)
|
||||
structure_info.append(f"{indent} {key}: {value_type} = {sample}")
|
||||
|
||||
if len(obj) > 10:
|
||||
structure_info.append(f"{indent} ... and {len(obj) - 10} more keys")
|
||||
|
||||
elif isinstance(obj, list):
|
||||
structure_info.append(f"{indent}{path} (list) - {len(obj)} items:")
|
||||
if obj:
|
||||
structure_info.append(f"{indent} Sample item structure:")
|
||||
structure_info.extend(explore_structure(obj[0], f"{path}[0]", depth + 1, max_depth))
|
||||
if len(obj) > 1:
|
||||
structure_info.append(f"{indent} ... and {len(obj) - 1} more items")
|
||||
else:
|
||||
value_type = type(obj).__name__
|
||||
structure_info.append(f"{indent}{path}: {value_type} = {obj}")
|
||||
|
||||
return structure_info
|
||||
|
||||
with open(output_path, 'w', encoding='utf-8') as f:
|
||||
f.write("="*80 + "\n")
|
||||
f.write("TIMELINE JSON STRUCTURE ANALYSIS\n")
|
||||
f.write("="*80 + "\n\n")
|
||||
|
||||
f.write(f"File: {timeline_path}\n")
|
||||
f.write(f"Analysis Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
|
||||
|
||||
# Overall structure
|
||||
f.write("ROOT LEVEL STRUCTURE:\n")
|
||||
f.write("-" * 40 + "\n")
|
||||
for key, value in data.items():
|
||||
if isinstance(value, list):
|
||||
f.write(f"{key}: list with {len(value)} items\n")
|
||||
elif isinstance(value, dict):
|
||||
f.write(f"{key}: dict with {len(value)} keys\n")
|
||||
else:
|
||||
f.write(f"{key}: {type(value).__name__} = {value}\n")
|
||||
|
||||
f.write("\n" + "="*80 + "\n")
|
||||
f.write("DETAILED STRUCTURE:\n")
|
||||
f.write("="*80 + "\n\n")
|
||||
|
||||
# Detailed structure analysis
|
||||
structure_lines = explore_structure(data)
|
||||
for line in structure_lines:
|
||||
f.write(line + "\n")
|
||||
|
||||
# Sample semantic segment analysis
|
||||
semantic_segments = data.get('semanticSegments', [])
|
||||
if semantic_segments:
|
||||
f.write("\n" + "="*80 + "\n")
|
||||
f.write("SEMANTIC SEGMENTS ANALYSIS:\n")
|
||||
f.write("="*80 + "\n\n")
|
||||
|
||||
f.write(f"Total semantic segments: {len(semantic_segments)}\n\n")
|
||||
|
||||
# Analyze different types of segments
|
||||
visit_count = sum(1 for seg in semantic_segments if 'visit' in seg)
|
||||
path_count = sum(1 for seg in semantic_segments if 'timelinePath' in seg)
|
||||
|
||||
f.write(f"Segments with visits: {visit_count}\n")
|
||||
f.write(f"Segments with timeline paths: {path_count}\n\n")
|
||||
|
||||
# Sample visit structure
|
||||
sample_visit = None
|
||||
sample_path = None
|
||||
|
||||
for segment in semantic_segments[:100]: # Check first 100 segments
|
||||
if 'visit' in segment and sample_visit is None:
|
||||
sample_visit = segment
|
||||
if 'timelinePath' in segment and sample_path is None:
|
||||
sample_path = segment
|
||||
if sample_visit and sample_path:
|
||||
break
|
||||
|
||||
if sample_visit:
|
||||
f.write("SAMPLE VISIT STRUCTURE:\n")
|
||||
f.write("-" * 40 + "\n")
|
||||
visit_structure = explore_structure(sample_visit, "sample_visit")
|
||||
for line in visit_structure:
|
||||
f.write(line + "\n")
|
||||
f.write("\n")
|
||||
|
||||
if sample_path:
|
||||
f.write("SAMPLE TIMELINE PATH STRUCTURE:\n")
|
||||
f.write("-" * 40 + "\n")
|
||||
path_structure = explore_structure(sample_path, "sample_timelinePath")
|
||||
for line in path_structure:
|
||||
f.write(line + "\n")
|
||||
|
||||
def print_statistics(stats: Dict[str, Any]):
|
||||
"""Print comprehensive statistics in a readable format."""
|
||||
print("\n" + "="*80)
|
||||
print("TIMELINE DATA OVERVIEW")
|
||||
print("="*80)
|
||||
|
||||
# Basic counts
|
||||
print(f"\n📊 BASIC STATISTICS")
|
||||
print(f" Total segments: {stats['total_segments']:,}")
|
||||
print(f" Visits: {stats['visits']:,}")
|
||||
print(f" Timeline paths: {stats['timeline_paths']:,}")
|
||||
print(f" Unique places: {len(stats['place_ids']):,}")
|
||||
|
||||
# Date range
|
||||
if stats['date_range']['earliest'] and stats['date_range']['latest']:
|
||||
earliest = stats['date_range']['earliest']
|
||||
latest = stats['date_range']['latest']
|
||||
total_days = (latest - earliest).days
|
||||
print(f"\n📅 DATE RANGE")
|
||||
print(f" Earliest record: {earliest.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
print(f" Latest record: {latest.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
print(f" Total span: {total_days:,} days ({total_days/365.25:.1f} years)")
|
||||
|
||||
# Duration statistics
|
||||
if stats['total_duration_hours'] > 0:
|
||||
print(f"\n⏰ DURATION STATISTICS")
|
||||
print(f" Total tracked time: {stats['total_duration_hours']:,.1f} hours")
|
||||
print(f" Average per day: {stats['total_duration_hours'] / max(total_days, 1):.1f} hours")
|
||||
|
||||
# Probability statistics
|
||||
if 'avg_probability' in stats:
|
||||
print(f"\n🎯 PROBABILITY STATISTICS")
|
||||
print(f" Average confidence: {stats['avg_probability']:.3f}")
|
||||
print(f" Min confidence: {stats['min_probability']:.3f}")
|
||||
print(f" Max confidence: {stats['max_probability']:.3f}")
|
||||
|
||||
# Geographic statistics
|
||||
if 'geographic_bounds' in stats:
|
||||
bounds = stats['geographic_bounds']
|
||||
print(f"\n🌍 GEOGRAPHIC STATISTICS")
|
||||
print(f" Northern bound: {bounds['north']:.6f}°")
|
||||
print(f" Southern bound: {bounds['south']:.6f}°")
|
||||
print(f" Eastern bound: {bounds['east']:.6f}°")
|
||||
print(f" Western bound: {bounds['west']:.6f}°")
|
||||
if 'max_distance_km' in stats:
|
||||
print(f" Max distance span: {stats['max_distance_km']:.1f} km")
|
||||
|
||||
# Top semantic types
|
||||
if stats['semantic_types']:
|
||||
print(f"\n🏷️ TOP LOCATION TYPES")
|
||||
for semantic_type, count in stats['semantic_types'].most_common(10):
|
||||
percentage = (count / stats['visits']) * 100 if stats['visits'] > 0 else 0
|
||||
print(f" {semantic_type:<15}: {count:,} ({percentage:.1f}%)")
|
||||
|
||||
# Top places
|
||||
if stats['place_ids']:
|
||||
print(f"\n📍 TOP VISITED PLACES")
|
||||
for i, (place_id, count) in enumerate(stats['place_ids'].most_common(5)):
|
||||
percentage = (count / stats['visits']) * 100 if stats['visits'] > 0 else 0
|
||||
print(f" #{i+1:<2} {place_id[:30]:<30}: {count:,} visits ({percentage:.1f}%)")
|
||||
|
||||
# Yearly activity
|
||||
if stats['yearly_activity']:
|
||||
print(f"\n📈 ACTIVITY BY YEAR")
|
||||
for year in sorted(stats['yearly_activity'].keys()):
|
||||
count = stats['yearly_activity'][year]
|
||||
print(f" {year}: {count:,} segments")
|
||||
|
||||
# Daily patterns
|
||||
if stats['daily_activity']:
|
||||
print(f"\n📆 ACTIVITY BY DAY OF WEEK")
|
||||
days_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
|
||||
for day in days_order:
|
||||
count = stats['daily_activity'].get(day, 0)
|
||||
if count > 0:
|
||||
print(f" {day:<10}: {count:,} segments")
|
||||
|
||||
# Monthly activity (show recent 12 months)
|
||||
if stats['monthly_activity']:
|
||||
print(f"\n📊 RECENT MONTHLY ACTIVITY")
|
||||
sorted_months = sorted(stats['monthly_activity'].keys())[-12:]
|
||||
for month in sorted_months:
|
||||
count = stats['monthly_activity'][month]
|
||||
print(f" {month}: {count:,} segments")
|
||||
|
||||
def main():
|
||||
"""Main function to run the timeline analysis."""
|
||||
# Get the script directory and find the repo root
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
repo_root = os.path.dirname(script_dir)
|
||||
|
||||
# Look for Timeline.json starting from repo root
|
||||
timeline_path = find_timeline_json(repo_root)
|
||||
|
||||
if not timeline_path:
|
||||
print("❌ Timeline.json file not found!")
|
||||
print(f"Searched in: {repo_root}")
|
||||
sys.exit(1)
|
||||
|
||||
# Generate output file names with timestamp
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
stats_output_path = os.path.join(repo_root, f"timeline_statistics_{timestamp}.txt")
|
||||
structure_output_path = os.path.join(repo_root, f"timeline_structure_{timestamp}.txt")
|
||||
|
||||
try:
|
||||
# Analyze the JSON structure first
|
||||
print("📋 Analyzing JSON structure...")
|
||||
analyze_json_structure(timeline_path, structure_output_path)
|
||||
print(f"✅ JSON structure exported to: {structure_output_path}")
|
||||
|
||||
# Analyze the data
|
||||
stats = analyze_timeline_data(timeline_path)
|
||||
|
||||
# Print the results to console
|
||||
print_statistics(stats)
|
||||
|
||||
# Export statistics to file
|
||||
print(f"\n📄 Exporting statistics to file...")
|
||||
export_statistics_to_file(stats, stats_output_path)
|
||||
print(f"✅ Statistics exported to: {stats_output_path}")
|
||||
|
||||
print(f"\n🎉 Analysis complete!")
|
||||
print(f"📊 Statistics file: {stats_output_path}")
|
||||
print(f"🏗️ Structure file: {structure_output_path}")
|
||||
print(f"📁 Source file: {timeline_path}")
|
||||
|
||||
except FileNotFoundError:
|
||||
print(f"❌ Error: Could not find Timeline.json at {timeline_path}")
|
||||
sys.exit(1)
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"❌ Error: Invalid JSON format in Timeline.json - {e}")
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
print(f"❌ Error analyzing timeline data: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user