#!/usr/bin/env python3 """ Export Timeline.json data into multiple normalized CSV files. Generated CSVs (default names): semantic_segments.csv visits.csv timeline_path_points.csv raw_signals.csv frequent_places.csv frequent_trips.csv frequent_trip_waypoints.csv frequent_trip_mode_distribution.csv travel_mode_affinities.csv Usage: python export_timeline_to_csv.py \ --timeline ../data/Timeline.json \ --outdir ./timeline_csv \ --prefix timeline_ If --timeline not supplied, the script searches upward from script dir for Timeline.json. """ import os import json import csv import argparse from typing import Tuple, Any, Dict, List from datetime import datetime # ---------------------------- Helpers --------------------------------- def find_timeline_json(start_path: str) -> str: for root, dirs, files in os.walk(start_path): if 'Timeline.json' in files: return os.path.join(root, 'Timeline.json') return '' def parse_coordinates(point_str: str) -> Tuple[Any, Any]: if not isinstance(point_str, str): return None, None try: s = point_str.replace('°', '').strip() if not s: return None, None parts = [p.strip() for p in s.split(',')] if len(parts) != 2: return None, None return float(parts[0]), float(parts[1]) except Exception: return None, None def ensure_dir(path: str): os.makedirs(path, exist_ok=True) # ---------------------------- Export Functions ------------------------- def export_semantic_segments(data: Dict, writer): segments = data.get('semanticSegments', []) for idx, seg in enumerate(segments): start = seg.get('startTime') end = seg.get('endTime') has_visit = 'visit' in seg has_path = 'timelinePath' in seg writer.writerow({ 'segment_index': idx, 'startTime': start, 'endTime': end, 'has_visit': int(has_visit), 'has_timeline_path': int(has_path) }) def export_visits(data: Dict, writer): for idx, seg in enumerate(data.get('semanticSegments', [])): if 'visit' not in seg: continue visit = seg.get('visit', {}) top = visit.get('topCandidate', {}) lat, lon = parse_coordinates(top.get('placeLocation', {}).get('latLng')) writer.writerow({ 'segment_index': idx, 'hierarchyLevel': visit.get('hierarchyLevel'), 'visit_probability': visit.get('probability'), 'top_place_id': top.get('placeId'), 'top_semantic_type': top.get('semanticType'), 'top_probability': top.get('probability'), 'top_lat': lat, 'top_lon': lon, 'startTime': seg.get('startTime'), 'endTime': seg.get('endTime') }) def export_timeline_path_points(data: Dict, writer): for idx, seg in enumerate(data.get('semanticSegments', [])): path = seg.get('timelinePath') if not isinstance(path, list): continue for p_idx, point_obj in enumerate(path): point_str = point_obj.get('point') lat, lon = parse_coordinates(point_str) writer.writerow({ 'segment_index': idx, 'point_index': p_idx, 'time': point_obj.get('time'), 'raw_point': point_str, 'lat': lat, 'lon': lon, }) def export_raw_signals(data: Dict, writer): for idx, signal in enumerate(data.get('rawSignals', [])): pos = signal.get('position', {}) # Raw signals coordinate key observed as 'LatLng' lat, lon = parse_coordinates(pos.get('LatLng') or pos.get('latLng')) writer.writerow({ 'raw_index': idx, 'timestamp': pos.get('timestamp'), 'lat': lat, 'lon': lon, 'accuracyMeters': pos.get('accuracyMeters'), 'altitudeMeters': pos.get('altitudeMeters'), 'speedMetersPerSecond': pos.get('speedMetersPerSecond'), 'source': pos.get('source') }) def export_frequent_places(data: Dict, writer): profile = data.get('userLocationProfile', {}) for place in profile.get('frequentPlaces', []) or []: lat, lon = parse_coordinates(place.get('placeLocation')) writer.writerow({ 'placeId': place.get('placeId'), 'label': place.get('label'), 'lat': lat, 'lon': lon }) def export_frequent_trips(data: Dict, trips_writer, waypoints_writer, mode_dist_writer): profile = data.get('userLocationProfile', {}) for idx, trip in enumerate(profile.get('frequentTrips', []) or []): waypoint_ids = trip.get('waypointIds') or [] mode_distribution = trip.get('modeDistribution') or [] trips_writer.writerow({ 'trip_index': idx, 'startTimeMinutes': trip.get('startTimeMinutes'), 'endTimeMinutes': trip.get('endTimeMinutes'), 'durationMinutes': trip.get('durationMinutes'), 'confidence': trip.get('confidence'), 'commuteDirection': trip.get('commuteDirection'), 'waypoint_count': len(waypoint_ids), 'mode_dist_count': len(mode_distribution) }) for w_idx, wid in enumerate(waypoint_ids): waypoints_writer.writerow({ 'trip_index': idx, 'waypoint_order': w_idx, 'waypoint_id': wid }) for m_idx, m in enumerate(mode_distribution): # Unknown exact structure, store JSON mode_dist_writer.writerow({ 'trip_index': idx, 'entry_index': m_idx, 'raw_json': json.dumps(m, ensure_ascii=False) }) def export_travel_mode_affinities(data: Dict, writer): profile = data.get('userLocationProfile', {}) persona = profile.get('persona', {}) for aff in persona.get('travelModeAffinities', []) or []: writer.writerow({ 'mode': aff.get('mode'), 'affinity': aff.get('affinity') }) # ---------------------------- Main ------------------------------------ def export_all(data: Dict, outdir: str, prefix: str): ensure_dir(outdir) def open_csv(name: str, fieldnames: List[str]): fpath = os.path.join(outdir, f"{prefix}{name}.csv") f = open(fpath, 'w', encoding='utf-8', newline='') writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() return f, writer files = [] try: # semantic segments f_seg, w_seg = open_csv('semantic_segments', ['segment_index','startTime','endTime','has_visit','has_timeline_path']) files.append(f_seg) export_semantic_segments(data, w_seg) # visits f_vis, w_vis = open_csv('visits', ['segment_index','hierarchyLevel','visit_probability','top_place_id','top_semantic_type','top_probability','top_lat','top_lon','startTime','endTime']) files.append(f_vis) export_visits(data, w_vis) # timeline path points f_path, w_path = open_csv('timeline_path_points', ['segment_index','point_index','time','raw_point','lat','lon']) files.append(f_path) export_timeline_path_points(data, w_path) # raw signals f_raw, w_raw = open_csv('raw_signals', ['raw_index','timestamp','lat','lon','accuracyMeters','altitudeMeters','speedMetersPerSecond','source']) files.append(f_raw) export_raw_signals(data, w_raw) # frequent places f_fp, w_fp = open_csv('frequent_places', ['placeId','label','lat','lon']) files.append(f_fp) export_frequent_places(data, w_fp) # frequent trips core f_trips, w_trips = open_csv('frequent_trips', ['trip_index','startTimeMinutes','endTimeMinutes','durationMinutes','confidence','commuteDirection','waypoint_count','mode_dist_count']) files.append(f_trips) # waypoints f_way, w_way = open_csv('frequent_trip_waypoints', ['trip_index','waypoint_order','waypoint_id']) files.append(f_way) # mode distribution f_md, w_md = open_csv('frequent_trip_mode_distribution', ['trip_index','entry_index','raw_json']) files.append(f_md) export_frequent_trips(data, w_trips, w_way, w_md) # travel mode affinities f_aff, w_aff = open_csv('travel_mode_affinities', ['mode','affinity']) files.append(f_aff) export_travel_mode_affinities(data, w_aff) finally: for f in files: f.close() def main(): parser = argparse.ArgumentParser(description='Export Timeline.json to multiple CSV files.') parser.add_argument('--timeline', type=str, help='Path to Timeline.json (auto-detect if omitted)') parser.add_argument('--outdir', type=str, default='timeline_csv', help='Output directory for CSV files') parser.add_argument('--prefix', type=str, default='', help='Filename prefix for CSV files') args = parser.parse_args() if args.timeline: timeline_path = args.timeline else: timeline_path = find_timeline_json(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))) if not timeline_path or not os.path.isfile(timeline_path): raise SystemExit('Timeline.json not found. Provide --timeline or place file in repository.') print(f'Loading {timeline_path} ...') with open(timeline_path, 'r', encoding='utf-8') as f: data = json.load(f) ts = datetime.now().strftime('%Y%m%d_%H%M%S') outdir = os.path.join(args.outdir, ts) print(f'Exporting CSV files to: {outdir}') export_all(data, outdir, args.prefix) print('Done.') if __name__ == '__main__': main()