From be4a87303ab5fae7d91811646d6e365c0293fece Mon Sep 17 00:00:00 2001 From: Azeem Fidahusein Date: Thu, 25 Sep 2025 21:01:03 +0100 Subject: [PATCH] json ot csv file --- scripts/csv/export_timeline_to_csv.py | 262 ++++++++++++++++++++++++++ 1 file changed, 262 insertions(+) create mode 100644 scripts/csv/export_timeline_to_csv.py diff --git a/scripts/csv/export_timeline_to_csv.py b/scripts/csv/export_timeline_to_csv.py new file mode 100644 index 0000000..6476cde --- /dev/null +++ b/scripts/csv/export_timeline_to_csv.py @@ -0,0 +1,262 @@ +#!/usr/bin/env python3 +""" +Export Timeline.json data into multiple normalized CSV files. + +Generated CSVs (default names): + semantic_segments.csv + visits.csv + timeline_path_points.csv + raw_signals.csv + frequent_places.csv + frequent_trips.csv + frequent_trip_waypoints.csv + frequent_trip_mode_distribution.csv + travel_mode_affinities.csv + +Usage: + python export_timeline_to_csv.py \ + --timeline ../data/Timeline.json \ + --outdir ./timeline_csv \ + --prefix timeline_ + +If --timeline not supplied, the script searches upward from script dir for Timeline.json. +""" + +import os +import json +import csv +import argparse +from typing import Tuple, Any, Dict, List +from datetime import datetime + +# ---------------------------- Helpers --------------------------------- + +def find_timeline_json(start_path: str) -> str: + for root, dirs, files in os.walk(start_path): + if 'Timeline.json' in files: + return os.path.join(root, 'Timeline.json') + return '' + +def parse_coordinates(point_str: str) -> Tuple[Any, Any]: + if not isinstance(point_str, str): + return None, None + try: + s = point_str.replace('°', '').strip() + if not s: + return None, None + parts = [p.strip() for p in s.split(',')] + if len(parts) != 2: + return None, None + return float(parts[0]), float(parts[1]) + except Exception: + return None, None + +def ensure_dir(path: str): + os.makedirs(path, exist_ok=True) + +# ---------------------------- Export Functions ------------------------- + +def export_semantic_segments(data: Dict, writer): + segments = data.get('semanticSegments', []) + for idx, seg in enumerate(segments): + start = seg.get('startTime') + end = seg.get('endTime') + has_visit = 'visit' in seg + has_path = 'timelinePath' in seg + writer.writerow({ + 'segment_index': idx, + 'startTime': start, + 'endTime': end, + 'has_visit': int(has_visit), + 'has_timeline_path': int(has_path) + }) + +def export_visits(data: Dict, writer): + for idx, seg in enumerate(data.get('semanticSegments', [])): + if 'visit' not in seg: + continue + visit = seg.get('visit', {}) + top = visit.get('topCandidate', {}) + lat, lon = parse_coordinates(top.get('placeLocation', {}).get('latLng')) + writer.writerow({ + 'segment_index': idx, + 'hierarchyLevel': visit.get('hierarchyLevel'), + 'visit_probability': visit.get('probability'), + 'top_place_id': top.get('placeId'), + 'top_semantic_type': top.get('semanticType'), + 'top_probability': top.get('probability'), + 'top_lat': lat, + 'top_lon': lon, + 'startTime': seg.get('startTime'), + 'endTime': seg.get('endTime') + }) + +def export_timeline_path_points(data: Dict, writer): + for idx, seg in enumerate(data.get('semanticSegments', [])): + path = seg.get('timelinePath') + if not isinstance(path, list): + continue + for p_idx, point_obj in enumerate(path): + point_str = point_obj.get('point') + lat, lon = parse_coordinates(point_str) + writer.writerow({ + 'segment_index': idx, + 'point_index': p_idx, + 'time': point_obj.get('time'), + 'raw_point': point_str, + 'lat': lat, + 'lon': lon, + }) + +def export_raw_signals(data: Dict, writer): + for idx, signal in enumerate(data.get('rawSignals', [])): + pos = signal.get('position', {}) + # Raw signals coordinate key observed as 'LatLng' + lat, lon = parse_coordinates(pos.get('LatLng') or pos.get('latLng')) + writer.writerow({ + 'raw_index': idx, + 'timestamp': pos.get('timestamp'), + 'lat': lat, + 'lon': lon, + 'accuracyMeters': pos.get('accuracyMeters'), + 'altitudeMeters': pos.get('altitudeMeters'), + 'speedMetersPerSecond': pos.get('speedMetersPerSecond'), + 'source': pos.get('source') + }) + +def export_frequent_places(data: Dict, writer): + profile = data.get('userLocationProfile', {}) + for place in profile.get('frequentPlaces', []) or []: + lat, lon = parse_coordinates(place.get('placeLocation')) + writer.writerow({ + 'placeId': place.get('placeId'), + 'label': place.get('label'), + 'lat': lat, + 'lon': lon + }) + +def export_frequent_trips(data: Dict, trips_writer, waypoints_writer, mode_dist_writer): + profile = data.get('userLocationProfile', {}) + for idx, trip in enumerate(profile.get('frequentTrips', []) or []): + waypoint_ids = trip.get('waypointIds') or [] + mode_distribution = trip.get('modeDistribution') or [] + trips_writer.writerow({ + 'trip_index': idx, + 'startTimeMinutes': trip.get('startTimeMinutes'), + 'endTimeMinutes': trip.get('endTimeMinutes'), + 'durationMinutes': trip.get('durationMinutes'), + 'confidence': trip.get('confidence'), + 'commuteDirection': trip.get('commuteDirection'), + 'waypoint_count': len(waypoint_ids), + 'mode_dist_count': len(mode_distribution) + }) + for w_idx, wid in enumerate(waypoint_ids): + waypoints_writer.writerow({ + 'trip_index': idx, + 'waypoint_order': w_idx, + 'waypoint_id': wid + }) + for m_idx, m in enumerate(mode_distribution): + # Unknown exact structure, store JSON + mode_dist_writer.writerow({ + 'trip_index': idx, + 'entry_index': m_idx, + 'raw_json': json.dumps(m, ensure_ascii=False) + }) + +def export_travel_mode_affinities(data: Dict, writer): + profile = data.get('userLocationProfile', {}) + persona = profile.get('persona', {}) + for aff in persona.get('travelModeAffinities', []) or []: + writer.writerow({ + 'mode': aff.get('mode'), + 'affinity': aff.get('affinity') + }) + +# ---------------------------- Main ------------------------------------ + +def export_all(data: Dict, outdir: str, prefix: str): + ensure_dir(outdir) + + def open_csv(name: str, fieldnames: List[str]): + fpath = os.path.join(outdir, f"{prefix}{name}.csv") + f = open(fpath, 'w', encoding='utf-8', newline='') + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + return f, writer + + files = [] + try: + # semantic segments + f_seg, w_seg = open_csv('semantic_segments', ['segment_index','startTime','endTime','has_visit','has_timeline_path']) + files.append(f_seg) + export_semantic_segments(data, w_seg) + + # visits + f_vis, w_vis = open_csv('visits', ['segment_index','hierarchyLevel','visit_probability','top_place_id','top_semantic_type','top_probability','top_lat','top_lon','startTime','endTime']) + files.append(f_vis) + export_visits(data, w_vis) + + # timeline path points + f_path, w_path = open_csv('timeline_path_points', ['segment_index','point_index','time','raw_point','lat','lon']) + files.append(f_path) + export_timeline_path_points(data, w_path) + + # raw signals + f_raw, w_raw = open_csv('raw_signals', ['raw_index','timestamp','lat','lon','accuracyMeters','altitudeMeters','speedMetersPerSecond','source']) + files.append(f_raw) + export_raw_signals(data, w_raw) + + # frequent places + f_fp, w_fp = open_csv('frequent_places', ['placeId','label','lat','lon']) + files.append(f_fp) + export_frequent_places(data, w_fp) + + # frequent trips core + f_trips, w_trips = open_csv('frequent_trips', ['trip_index','startTimeMinutes','endTimeMinutes','durationMinutes','confidence','commuteDirection','waypoint_count','mode_dist_count']) + files.append(f_trips) + # waypoints + f_way, w_way = open_csv('frequent_trip_waypoints', ['trip_index','waypoint_order','waypoint_id']) + files.append(f_way) + # mode distribution + f_md, w_md = open_csv('frequent_trip_mode_distribution', ['trip_index','entry_index','raw_json']) + files.append(f_md) + export_frequent_trips(data, w_trips, w_way, w_md) + + # travel mode affinities + f_aff, w_aff = open_csv('travel_mode_affinities', ['mode','affinity']) + files.append(f_aff) + export_travel_mode_affinities(data, w_aff) + + finally: + for f in files: + f.close() + + +def main(): + parser = argparse.ArgumentParser(description='Export Timeline.json to multiple CSV files.') + parser.add_argument('--timeline', type=str, help='Path to Timeline.json (auto-detect if omitted)') + parser.add_argument('--outdir', type=str, default='timeline_csv', help='Output directory for CSV files') + parser.add_argument('--prefix', type=str, default='', help='Filename prefix for CSV files') + args = parser.parse_args() + + if args.timeline: + timeline_path = args.timeline + else: + timeline_path = find_timeline_json(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))) + + if not timeline_path or not os.path.isfile(timeline_path): + raise SystemExit('Timeline.json not found. Provide --timeline or place file in repository.') + + print(f'Loading {timeline_path} ...') + with open(timeline_path, 'r', encoding='utf-8') as f: + data = json.load(f) + + ts = datetime.now().strftime('%Y%m%d_%H%M%S') + outdir = os.path.join(args.outdir, ts) + print(f'Exporting CSV files to: {outdir}') + export_all(data, outdir, args.prefix) + print('Done.') + +if __name__ == '__main__': + main()