-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathprocess_and_resolve.py
2082 lines (1903 loc) · 83.3 KB
/
process_and_resolve.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import logging
from functools import cmp_to_key
from geometry_basics import *
from geometry_search import GeometrySearch, snap_to_closest_way
from twodimsearch import TwoDimSearch
from merge_tags import merge_tags, append_fixme_value
from nvdb_segment import *
from proj_xy import latlon_str
from waydb import GEO_FILL_LENGTH
from tag_translations import ALL_VEHICLES
from nseg_tools import *
_log = logging.getLogger("process")
SMALL_ROAD_RESOLVE_ALGORITHMS = ['default', 'prefer_track', 'prefer_track_static', 'prefer_service', 'prefer_service_static']
MAJOR_HIGHWAYS = [ "trunk", "motorway", "primary", "secondary", "tertiary", "trunk_link", "motorway_link", "primary_link", "secondary_link", "tertiary_link" ]
MINOR_HIGHWAYS = [ "residential", "unclassified", "service", "track" ]
NVDB_USED_KEYS = [
"NVDB_vagnummer",
"NVDB_gagata",
"NVDB_gangfartsomrode",
"NVDB_gangfartsomrode_side",
"NVDB_gatutyp",
"NVDB_motorvag",
"NVDB_motortrafikled",
"NVDB_generaliseringstyp",
"NVDB_cykelvagkat",
"NVDB_guess_lanes",
"NVDB_gagata_side",
"NVDB_layby_side",
"NVDB_rwc_tracks",
"NVDB_road_role",
"NVDB_government_funded",
"NVDB_availability_class",
"KLASS",
"FPVKLASS",
"GCMTYP",
"KONTRUTION",
"OEPNINSBAR"
]
# merge_translated_tags()
#
# Help function to merge tags with special case for fix me tag.
#
def merge_translated_tags(way, tags):
for k, v in tags.items():
if k == "fixme":
append_fixme_value(way.tags, v)
else:
way.tags[k] = v
# find_overlapping_and_remove_duplicates()
#
# Go through a freshly read NVDB layer and log any overlaps and remove any duplicates.
#
def find_overlapping_and_remove_duplicates(data_src_name, ways):
class NvdbSegmentDup:
def __init__(self, nvdb_seg):
self.way = nvdb_seg.way
self.tags = nvdb_seg.tags
self.rlid = nvdb_seg.rlid
def __eq__(self, other):
if not isinstance(other, NvdbSegmentDup):
return False
return self.way == other.way and self.rlid == other.rlid and self.tags == other.tags
def __hash__(self):
return hash(self.rlid)
pointpairs = {}
waysd = {}
new_ways = []
overlap = False
for way in ways:
wd = NvdbSegmentDup(way)
if wd not in waysd:
waysd[wd] = way
else:
#way2 = waysd[wd]
# duplicates are so normal and common, so we don't care to log them any longer
#if isinstance(way.way, list):
# print("Duplicate ways:")
# print(" Way 1: %s tags=%s (%s points, index %s)" % (way, way.tags, len(way.way), way.way_id))
# print(" Way 2: %s tags=%s (%s points, index %s)" % (way2, way2.tags, len(way2.way), way2.way_id))
#else:
# print("Duplicate nodes:")
# print(" Node 1: %s tags=%s (%s, index %s)" % (way, way.tags, way.way, way.way_id))
# print(" Node 2: %s tags=%s (%s, index %s)" % (way2, way2.tags, way2.way, way2.way_id))
continue
# overlapping segments are quite normal, but may be interesting to print, except for some
# data layers when there's always lots of overlaps (it must be for some layers, for directional speed limits
# for example)
if isinstance(way.way, list) and data_src_name not in ("NVDB-Hastighetsgrans", "NVDB-Vagnummer"):
it = iter(way.way)
prev = next(it)
for p in it:
pp = (p, prev)
prev = p
if pp not in pointpairs:
pointpairs[pp] = way
else:
way2 = pointpairs[pp]
_log.info(f"Self-overlapping segment between {latlon_str(pp[0])}-{latlon_str(pp[1])}:")
_log.info(f" Segment 1: {way} tags={way.tags} ({len(way.way)} points, index {way.way_id})")
_log.info(f" Segment 2: {way2} tags={way2.tags} ({len(way2.way)} points, index {way2.way_id})")
overlap = True
break
new_ways.append(way)
if len(new_ways) < len(ways):
_log.info(f"{data_src_name} has duplicate elements. Only one copy of each was kept")
if overlap:
_log.info(f"{data_src_name} has overlapping segments.")
return new_ways
# preprocess_laybys
#
# OSM tradition is to map layby parkings as unattached nodes beside the road rather than on it,
# (even if that makes routing difficult).
#
# Here we move the layby nodes parallel to the road
#
def preprocess_laybys(points, way_db):
LAYBY_OFFSET = 7
for node in points:
ways = way_db.gs.find_all_nearby_ways(node.way)
_, snap, way = snap_to_closest_way(ways, node.way)
if way is None:
# this can happen near borders where roads have been cut but laybys kept
_log.warning(f"Did not find nearby way for layby {node.rlid}, keeping position as is")
continue
for idx, p in enumerate(way.way):
if idx == 0:
continue
prev = way.way[idx-1]
is_between, _ = point_between_points(snap, prev, p, 1e-6)
if not is_between:
continue
prev, p = rotate_90deg(prev, p)
oldlen = dist2d(prev, p)
xd = (p.x - prev.x) / oldlen * LAYBY_OFFSET
yd = (p.y - prev.y) / oldlen * LAYBY_OFFSET
if node.tags.get("NVDB_layby_side", None) == "right":
node.way.x = snap.x - xd
node.way.y = snap.y - yd
else:
node.way.x = snap.x + xd
node.way.y = snap.y + yd
break
return points
# preprocess_footcycleway_crossings
#
# NVDB NVDB-GCM_passage (crossings for cycleways and footways) points are often placed a bit
# beside the actual crossings so we need scan the geometry and find which crossings they belong
# and align.
#
# NVDB-GCM_vagtyp data must be in the database for this function to work.
#
def preprocess_footcycleway_crossings(points, way_db):
def has_value_tags(tags):
for k in tags.keys():
if k not in NVDB_GEOMETRY_TAGS:
return True
return False
_log.info("Preprocess footway/cycleway crossings...")
crossings = []
unconnected_count = 0
connected_count = 0
for node in points:
if not has_value_tags(node.tags):
# empty, exclude this node
continue
cp = way_db.gs.find_crossing_points_within(node.way, 5)
if len(cp) == 0:
# This can happen normally in newly built areas where roads are not complete, or
# where crossings are mapped but not related footway
unconnected_count += 1
else:
# Snap to suitable crossing
min_dist = None
min_p = None
for pl in cp:
dist = dist2d(node.way, pl[0])
has_other = False
has_gcm = False
for way in pl[1]:
if "GCMTYP" in way.tags:
has_gcm = True
else:
has_other = True
if has_gcm and has_other and (min_dist is None or dist < min_dist):
min_dist = dist
min_p = pl[0]
if min_p is None:
unconnected_count += 1
else:
node.way = min_p
connected_count += 1
crossings.append(node)
_log.info(f"done ({connected_count} attached to a way crossing, {unconnected_count} without)")
return crossings
# preprocess_bridges_and_tunnels()
#
# Convert NVDB bridges and tunnels into OSM tags.
#
# - The NVDB bridge/tunnel data has a considerable amount of errors
# - Missing bridges, especially for cycleway
# - Sometimes incorrect tags (bridge that should be under passage etc)
# - Some alignment errors of under passages missing the bridge above
# - Sometimes bridges are off by ~100 meters (rare?).
# - Errors are more prominent in city areas like Stockholm with many long bridges and tunnels,
# in rural areas the data is often mostly correct
#
# - It's not possible to resolve the "layer" tag for OSM data. If the NVDB data would be 100%
# correct it would be possible to make good educated guesses with an advanced algorithm. However
# as the data is not that good and multi-layer bridges are rare, we've not made an algoritm for
# that, a fix me tag is added instead when crossing bridges and tunnels are detected.
#
# - OSM mapping tradition is to map a cycleway/footway that passes under a street as a tunnel,
# even when the construction is technically a short bridge. In NVDB this is mapped as överfart
# on the street and underfart on the cycleway. This is resolved here, converting to the OSM
# mapping tradition.
#
def preprocess_bridges_and_tunnels(ways, way_db):
def is_short_bridge(bridge, bridges, short_bridge):
dist, _ = calc_way_length(bridge.way)
if dist > short_bridge:
return False
measured_bridges = { bridge }
for b in bridges:
if b in measured_bridges:
continue
if dist2d(b.way[0], bridge.way[0]) < 0.05 or dist2d(b.way[0], bridge.way[-1]) < 0.05:
dist2, _ = calc_way_length(b.way)
dist += dist2
if dist > short_bridge:
return False
measured_bridges.add(b)
bridge = b
return True
# Here (before being merged with other layers) bridges/tunnels are in the original lengths
# ways may still be broken into parts
bridges = []
underways = []
tunnels = []
bridges_lcs = GeometrySearch(GEO_FILL_LENGTH)
tunnels_lcs = GeometrySearch(GEO_FILL_LENGTH)
for way in ways:
konst = way.tags["KONTRUTION"]
# Due to incorrect tagging of "överfart" vs "överfart och underfart" and that
# "överfart och underfart" is very rare, we handle them the same as "överfart"
if konst in ("överfart", "överfart och underfart"):
bridges.append(way)
bridges_lcs.insert(way)
elif konst == "underfart":
underways.append(way)
elif konst == "tunnel":
tunnels.append(way)
tunnels_lcs.insert(way)
else:
raise RuntimeError(f"Unhandled tag name {konst}")
# First go through under passages, and convert to tunnel if needed.
_log.info(f"Checking {len(underways)} under-passages relation to {len(bridges)} bridge segments...")
SHORT_BRIDGE_LENGTH = 12
delete_bridges = set()
convert_to_tunnels = []
for way in underways:
crossings = bridges_lcs.find_crossing_ways(way)
short_bridge_count = 0
for bridge, _ in crossings:
if is_short_bridge(bridge, bridges, SHORT_BRIDGE_LENGTH):
short_bridge_count += 1
if len(crossings) == 0:
# this sometimes happens due to poor alignment or a missing bridge, but can also be crossed by railway bridges
# (not included in the data)
#print("Underfart %s not crossed" % way.rlid)
pass
elif short_bridge_count == len(crossings):
#print("Underfart %s crossed only by %s short bridges, converting to tunnel and removing bridges" % (way.rlid, len(crossing)))
convert_to_tunnels.append(way)
for bridge, _ in crossings:
delete_bridges.add(bridge)
for way in convert_to_tunnels:
way.tags["tunnel"] = "yes"
way.tags["layer"] = -1
nbridges = []
bridges_lcs = GeometrySearch(GEO_FILL_LENGTH)
for way in bridges:
if not way in delete_bridges:
bridges_lcs.insert(way)
nbridges.append(way)
bridges = nbridges
_log.info('done')
if len(convert_to_tunnels) > 0:
_log.info(f"{len(convert_to_tunnels)} under-passages was converted to tunnel and the related {len(delete_bridges)} short bridges were removed")
_log.info(f"Processing {len(bridges)} bridge segments...")
fixme_count = 0
for way in bridges:
tags = {}
tags["bridge"] = "yes"
tags["layer"] = 1
if len(bridges_lcs.find_crossing_ways(way, abort_at_first=True)) > 0:
# in theory we could do a better job if we looked at "överfart och underfart" separately
# and analyzed the connecting road network, but as the data is often not fully correct and
# these situations are rare, we just add a FIXME in these situations.
tags["fixme"] = "could not resolve layer"
fixme_count += 1
merge_translated_tags(way, tags)
_log.info('done')
if fixme_count > 0:
_log.warning(f"{fixme_count} bridge segments crosses other bridges, cannot resolve layers, fixme tags added")
if len(tunnels) > 0:
_log.info("Setting up search datastructure for crossing lines...")
all_lcs = GeometrySearch(GEO_FILL_LENGTH)
all_lcs.insert_waydb(way_db.way_db)
_log.info("done")
_log.info(f"Processing {len(tunnels)} tunnel segments...")
fixme_count = 0
for way in tunnels:
tags = {}
tags["tunnel"] = "yes"
if len(tunnels_lcs.find_crossing_ways(way, abort_at_first=True)) > 0:
fixme_count += 1
tags["layer"] = -1
tags["fixme"] = "could not resolve layer"
else:
# no crossing ways => no layer tag
if len(all_lcs.find_crossing_ways(way, abort_at_first=True)) > 0:
tags["layer"] = -1
merge_translated_tags(way, tags)
_log.info('done')
if fixme_count > 0:
_log.warning(f"{fixme_count} tunnel segments crosses other tunnel, cannot resolve layers, fixme tags added")
return bridges + tunnels + convert_to_tunnels
# process_street_crossings()
#
# Process the NVDB point layer Korsning and geometry it affects. This includes
# setting names on roundabouts and snapping the points to actual crossings in the
# line geometry.
#
def process_street_crossings(points, way_db, data_src_name):
def get_roundabout_ways(point_on_roundabout, way_db):
ways = way_db.gs.find_all_connecting_ways(point_on_roundabout)
rbw = set()
for w in ways:
if w.tags.get("junction", "") == "roundabout":
rbw.add(w)
last_size = len(rbw)
while True:
nrbw = set()
for w in rbw:
ways = way_db.gs.find_all_connecting_ways(w.way[0]).copy()
ways.update(way_db.gs.find_all_connecting_ways(w.way[-1]))
for w1 in ways:
if w1.tags.get("junction", "") == "roundabout":
nrbw.add(w1)
rbw.update(nrbw)
if len(rbw) == last_size:
break
last_size = len(rbw)
return rbw
named_rbw = set()
crossings = []
fixme_count = 0
for node in points:
if not "highway" in node.tags and node.tags["NVDB_generaliseringstyp"] != "cirkulationsplats":
# no useful info, skip this crossing
continue
cp = way_db.gs.find_crossing_points_within(node.way, 2)
fixme = False
if len(cp) == 0:
fixme = True
else:
# Snap to suitable crossing
min_dist = None
min_p = None
for pl in cp:
dist = dist2d(node.way, pl[0])
if (min_dist is None or dist < min_dist):
min_dist = dist
min_p = pl[0]
if min_p is None:
fixme = True
else:
node.way = min_p
if fixme:
if not "highway" in node.tags:
# ignore
continue
fixme_count += 1
merge_translated_tags(node, {"fixme": "no suitable street crossing found"})
if node.tags["NVDB_generaliseringstyp"] == "cirkulationsplats" and node.tags.get("name", None) is not None:
rbw = get_roundabout_ways(node.way, way_db)
# if there are already names on the roads, we put roundabout name as the main one
for w in rbw:
if "name" in w.tags:
merge_name = w.tags["name"]
w.tags["name"] = node.tags["name"]
else:
merge_name = node.tags["name"]
merge_tags(w, {"name": merge_name}, data_src_name)
named_rbw.add(w)
if "highway" in node.tags:
del node.tags["name"]
crossings.append(node)
if fixme_count > 0:
_log.warning(f"did not find any way crossing for {fixme_count} street crossings, fixme tags added")
# OSM mandates that roundabouts should not have street names, but only be named if the roundabout has
# a specific name, so we make a second pass and remove
remove_roundabout_street_names = True
if remove_roundabout_street_names:
remove_count = 0
for way in way_db:
if way.tags.get("junction", None) == "roundabout":
way.tags.pop("alt_name", None)
if way not in named_rbw:
if way.tags.pop("name", None) is not None:
remove_count += 1
_log.info(f"Removed {remove_count} roundabout street names")
return crossings
# preprocess_railway_crossings()
#
def preprocess_railway_crossings(points, way_db, railways):
_log.info("Setting up search data structure for railways (to snap railway crossings)...")
rw_gs = GeometrySearch(1000)
for w in railways:
rw_gs.insert(w)
_log.info("done")
fixme_count = 0
rw_crossings = []
for node in points:
ways = way_db.gs.find_all_nearby_ways(node.way)
cp = []
crossing_tag = "level_crossing"
for w in ways:
if w.rlid == node.rlid:
if "KLASS" not in w.tags:
crossing_tag = "crossing"
crossings = rw_gs.find_crossing_ways(w)
for w_and_cp in crossings:
crossing_point = w_and_cp[1]
dist = dist2d(node.way, crossing_point)
cp.append((dist, crossing_point))
cp = sorted(cp, key=lambda x: x[0])
fixme = False
node.tags["railway"] = crossing_tag
if len(cp) == 0:
fixme = True
rw_crossings.append(node)
else:
# Snap to suitable crossing(s)
track_count = node.tags.get("NVDB_rwc_tracks", 1)
for i in range(0, track_count):
try:
dist = cp[i][0]
except IndexError:
_log.warning(f"More tracks than crossings {track_count} {i} {cp}")
dist = 500
# as we match RLID it's low risk to get the crossing wrong, so we can use a large max dist
# Offset errors of 150 meters have been observed (Lycksele data set)
if dist > 200:
fixme = True
rw_crossings.append(node)
break
crossing_point = cp[i][1]
rwc = node.make_copy_new_way(crossing_point)
rw_crossings.append(rwc)
if fixme:
fixme_count += 1
merge_translated_tags(node, {"fixme": "no nearby railway crossing found"})
if fixme_count > 0:
_log.warning(f"did not find any actual crossing between railway and way for {fixme_count} railway crossings, fixme tags added")
return rw_crossings
# parse_road_number()
#
# Split text road numbers into components
#
def parse_road_number(rn):
rn = str(rn)
if rn[0].isalpha():
rns = rn.split(' ')
prefix = rns[0]
rest = rns[1]
else:
prefix = ""
rest = rn
rns = rest.split('.')
main_nr = int(rns[0])
if len(rns) > 1:
sub_nr = int(rns[1])
else:
sub_nr = 0
# E is also "Östergötlands län"
is_e_road = prefix == 'E' and main_nr < 500
return prefix, main_nr, sub_nr, is_e_road
# compare_vagnummer()
#
# Sort function for road numbers, used when there is more than one road number on the same road segment.
#
def compare_vagnummer(r1, r2):
p1, n1, u1, r1_is_e = parse_road_number(r1)
p2, n2, u2, r2_is_e = parse_road_number(r2)
# E roads get sorted first
if r1_is_e != r2_is_e:
if r1_is_e:
return -1
return 1
# Sort on länsbokstav
if p1 > p2:
return 1
if p1 < p2:
return -1
# Sort on main number
if n1 != n2:
return n1 - n2
# Sort on under number
return u1 - u2
# resolve_highways()
#
# Using information from multiple layers, figure out what the highway tag should be (and some side tags)
#
def resolve_highways(way_db, small_road_resolve_algorithm):
_log.info("Resolve highway tags...")
fixme_count = 0
gcm_resolve_crossings = []
for way in way_db:
# convert tags
if "NVDB_vagnummer" in way.tags:
refs = way.tags["NVDB_vagnummer"]
if isinstance(refs, list):
refs.sort(key=cmp_to_key(compare_vagnummer))
way.tags["ref"] = refs
klass = int(way.tags.get("KLASS", -1))
tags = {}
if "GCMTYP" in way.tags:
# Note GCM-typ values was changed by Trafikverket in November 2021. This code handles only the new values
gcmtyp = way.tags["GCMTYP"]
if gcmtyp in (100, "Gång- och cykelbana"):
tags["highway"] = "cycleway"
tags["foot"] = "yes"
elif gcmtyp in (105, "Gång- och cykelbana, uppdelad"):
tags["highway"] = "cycleway"
tags["segregated"] = "yes"
tags["foot"] = "yes"
elif gcmtyp in (110, "Cykelbana, påbjuden"):
tags["highway"] = "cycleway"
tags["foot"] = "no"
elif gcmtyp in (115, "Gångbana"):
tags["highway"] = "footway"
elif gcmtyp in (120, "Cykelfält"):
tags["highway"] = "cycleway"
elif gcmtyp in (125, "Cykelpassage och övergångsställe"):
# TODO difference with cykelöverfart not seen. Cykelpassage = bilar har ingen väjningsplikt för cyklar
tags["highway"] = "cycleway"
tags["cycleway"] = "crossing"
tags["crossing"] = "marked"
tags["foot"] = "yes"
elif gcmtyp in (130, "Cykelöverfart och övergångsställe"):
tags["highway"] = "cycleway"
tags["cycleway"] = "crossing"
tags["crossing"] = "marked"
tags["foot"] = "yes"
elif gcmtyp in (135, "Cykelpassage"):
# Defined as for cycling only, but in some places it's connected to "gång- och cykelbana" on both sides
tags["highway"] = "cycleway"
gcm_resolve_crossings.append(way)
elif gcmtyp in (140, "Cykelöverfart"):
tags["highway"] = "cycleway"
tags["cycleway"] = "crossing"
tags["crossing"] = "marked"
elif gcmtyp in (145, "Övergångsställe"):
# Defined as footway only, but unfortunately in many places in NVDB it's also used for cycleways
tags["crossing"] = "marked"
tags["highway"] = "path" # refined later
gcm_resolve_crossings.append(way)
elif gcmtyp in (150, "Gatupassage utan utmärkning"):
# This is mostly used for unmarked footway crossings, but in some situations
# it's also a cycleway crossing, we need to resolve that by looking at connecting ways
tags["crossing"] = "unmarked"
tags["highway"] = "path" # refined later
gcm_resolve_crossings.append(way)
elif gcmtyp in (155, "Trappa"):
tags["highway"] = "steps"
elif gcmtyp in (160, "Torg/Öppen yta"):
tags["highway"] = "cycleway"
tags["foot"] = "yes"
elif gcmtyp in (165, "Annan cykelbar förbindelse"): # Annan cykelbar förbindelse
# This type unfortunately has multi-uses. In larger cities it's commonly used to
# connect disconnected cycleways, eg in places you need to pass 10 - 20 meters of
# pavement to get on to the next section. But it's also used for longer sections
# of unpaved tracks that make practical links for cyclists but are not really
# maintained as cycleways (this is the official definition in NVDB).
#
# To differ between these we look at road surface, and if it's marked oneway
# (happens in some cases in cities) we also upgrade it to cycleway
#
if "oneway" in way.tags or way.tags.get("surface", "unpaved") != "unpaved":
tags["highway"] = "cycleway"
tags["foot"] = "yes"
else:
tags["highway"] = "path"
tags["bicycle"] = "yes"
elif gcmtyp in (170, "Annan ej cykelbar förbindelse"):
# These may be ridable anyway, so we don't dare to set bicycle=no
tags["highway"] = "path"
elif gcmtyp in (175, "Hiss"):
tags["highway"] = "elevator"
else:
raise RuntimeError(f"Unknown GCM-typ {gcmtyp} for {way.rlid}")
elif "NVDB_cykelvagkat" in way.tags:
# value is one of "Regional cykelväg", "Huvudcykelväg", "Lokal cykelväg", we tag all the same
tags["highway"] = "cycleway"
tags["foot"] = "yes"
elif "NVDB_gagata" in way.tags:
# We ignore NVDB_gagata_side, from investigations it doesn't seem to provide any valuable information
tags["highway"] = "pedestrian"
if way.tags.get("width", 1000) < 3:
# JOSM doesn't like pedestrian roads narrower than 3 meters
tags["highway"] = "cycleway"
tags["foot"] = "yes"
way.tags.pop("maxspeed", None) # cycleways shouldn't have maxspeed
elif "NVDB_gangfartsomrode" in way.tags:
# We ignore NVDB_gangfartsomrode_side, from investigations it seems that even if
# on only one side the speed limit is set to 5 km/h.
tags["highway"] = "living_street"
elif "NVDB_motorvag" in way.tags:
tags["highway"] = "motorway"
elif "NVDB_motortrafikled" in way.tags:
tags["highway"] = "trunk"
tags["motorroad"] = "yes"
elif "ref" in way.tags: # road number
# Note that even if a road has a road number (and is officially for example "Primär Länsväg") we
# still use KLASS instead of road number to set primary/secondary/tertiary. It's common that
# functional class changes even if the road number is the same (as the road gets into more rural areas,
# functional class is often lowered)
#
# There is a less detailed function road class, FPVKLASS, which matches better to what is currently
# mapped in OSM, so that is used when it results in a higher level than KLASS.
#
# A road with road number will not get worse than tertiary.
#
if klass == -1:
#raise RuntimeError("KLASS is missing for RLID %s (ref %s)" % (way.rlid, way.tags["NVDB_vagnummer"]));
#print("Warning: KLASS is missing for RLID %s (ref %s)" % (way.rlid, way.tags["NVDB_vagnummer"]));
tags["fixme"] = "could not resolve highway tag"
else:
fpvklass_translations = {
"Nationella vägar": 0,
"Regionalt viktiga vägar": 1,
"Kompletterande regionalt viktiga vägar": 2,
1: 0,
2: 1,
3: 2
}
if way.tags.get("FPVKLASS", -1) not in fpvklass_translations:
fpv_level = 1000
else:
fpv_level = fpvklass_translations[way.tags.get("FPVKLASS", -1)]
klass = int(way.tags["KLASS"])
if klass <= 1:
k_level = 0 # trunk
elif klass <= 2:
k_level = 1 # primary
elif klass <= 4:
k_level = 2 # secondary
else:
k_level = 3 # tertiary
if fpv_level < k_level:
k_level = fpv_level
levels = [ "trunk", "primary", "secondary", "tertiary" ]
tags["highway"] = levels[k_level]
elif "NVDB_gatutyp" in way.tags and way.tags["NVDB_gatutyp"] != "Övergripande länk":
gatutyp = way.tags["NVDB_gatutyp"]
if gatutyp == "Övergripande länk":
raise RuntimeError() # should already be handled
if gatutyp == "Huvudgata":
if klass <= 1:
tags["highway"] = "trunk" # 0, 1
elif klass <= 2:
tags["highway"] = "primary" # 2
elif klass <= 4:
tags["highway"] = "secondary" # 3, 4
elif klass <= 5:
tags["highway"] = "tertiary" # 5
else:
tags["highway"] = "residential"
elif gatutyp == "Lokalgata stor":
tags["highway"] = "residential"
elif gatutyp == "Lokalgata liten":
tags["highway"] = "residential"
elif gatutyp == "Kvartersväg":
tags["highway"] = "service"
elif gatutyp == "Parkeringsområdesväg":
tags["highway"] = "service"
tags["service"] = "parking_aisle"
elif gatutyp == "Infartsväg/Utfartsväg":
tags["highway"] = "service"
elif gatutyp == "Leveransväg":
tags["highway"] = "unclassified"
elif gatutyp == "Småväg":
tags["highway"] = "unclassified"
else:
raise RuntimeError(f"Unknown gatutyp {gatutyp}")
elif "KLASS" in way.tags:
# KLASS (from FunkVagklass) on it's own is used here last as a fallback
# when there is no other information to rely on. KLASS is a metric on how
# important a road is, and it depends on context. KLASS 8 can for example
# be used both on forestry roads in rural areas and on living and pedestrian
# streets in a city.
#
# For forestry roads the official definition is: 7 huvudväg, 8 normalväg,
# 9, nollväg. However, the distinction between 8 and 9 in the actual NVDB
# data is not that good. From testing the least bad default seems to be
# to map both 8 and 9 to track.
#
# City roads should normally already been resolved by other layers, so here
# we apply the highway tag as best suited in rural areas. Exception:
# The NVDB-Gatunamn which provides NVDB_road_role tag is used to differ
# between service/unclassified and track on KLASS 8 and 9. (Names on forestry
# and other private roads comes from NVDB-Ovrigt_vagnamn and doesn't have
# role tag set)
#
# Special case for ferry routes (shouldn't have a highway tag)
prefer_service = small_road_resolve_algorithm in ['prefer_service', 'prefer_service_static']
if klass <= 1:
tags["highway"] = "trunk" # 0, 1
elif klass <= 2:
tags["highway"] = "primary" # 2
elif klass <= 4:
tags["highway"] = "secondary" # 3, 4
elif klass <= 6:
tags["highway"] = "tertiary" # 5, 6
elif klass <= 7:
tags["highway"] = "unclassified" # 7
elif klass <= 8:
if way.tags.get("NVDB_government_funded", "no") == "yes" or "NVDB_road_role" in way.tags:
tags["highway"] = "unclassified" # 8
elif "NVDB_availability_class" not in way.tags and prefer_service:
tags["highway"] = "service" # 8
else:
tags["highway"] = "track" # 8
else:
if way.tags.get("NVDB_government_funded", "no") == "yes" or "NVDB_road_role" in way.tags or \
"NVDB_availability_class" not in way.tags and prefer_service:
tags["highway"] = "service" # 9
else:
tags["highway"] = "track" # 9
# Special case for ferry
if way.tags.get("route", None) == "ferry":
tags["ferry"] = tags["highway"]
tags.pop("highway")
if tags["ferry"] in [ "track", "service" ]:
tags["ferry"] = "unclassified"
tags["foot"] = "yes"
tags["bicycle"] = "yes"
tags["motor_vehicle"] = "yes"
else:
#print("Warning: information missing to resolve highway tag for RLID %s, adding fixme tag" % way.rlid)
tags["fixme"] = "could not resolve highway tag"
# check if we should make this a link
if tags.get("highway", None) in ["motorway", "trunk", "primary", "secondary", "tertiary"] and \
way.tags.get("NVDB_road_role", None) == 4 and way.tags.get("junction", None) != "roundabout":
tags['highway'] += "_link"
if "fixme" in tags:
fixme_count += 1
merge_translated_tags(way, tags)
# Second pass for things we couldn't resolve in the first pass
if len(gcm_resolve_crossings) > 0:
# group together all crossing segments that are connected to eachother
crossings = []
processed_set = set()
_log.debug(f"Resolving {len(gcm_resolve_crossings)} GCM crossing segments")
for way in gcm_resolve_crossings:
if way in processed_set:
continue
crossings.append([ way ])
processed_set.add(way)
# in rare occasions way is a loop, the set() trick makes sure we don't run the same endpoint twice
for ep in set([ way.way[0], way.way[-1] ]):
ways = way_db.gs.find_all_connecting_ways(ep)
ways.remove(way)
for w in ways:
if w in gcm_resolve_crossings and w not in processed_set:
crossings[-1].append(w)
processed_set.add(w)
_log.debug(f"Resolving {len(crossings)} GCM crossings")
# if the crossing has two or more cycleway connections at different endpoints, make cycleway,
# if the crossing has only one cycleway connection but no footway connection (at a different endpoint), still make cycleway,
# otherwise make footway
for crossing in crossings:
cw_count = 0
fw_count = 0
for way in crossing:
for ep in set([ way.way[0], way.way[-1] ]):
ways = way_db.gs.find_all_connecting_ways(ep)
connected_to_footway = False
for w in ways:
if w.tags.get("highway", None) == "cycleway":
cw_count += 1
connected_to_footway = False # If connected to both, we only count cycleway
break
if w.tags.get("highway", None) == "footway":
connected_to_footway = True
if connected_to_footway:
fw_count += 1
_log.debug(f"Crossing {crossing[0].rlid} has {cw_count} cw and {fw_count} fw")
for way in crossing:
if (cw_count == 1 and fw_count == 0) or cw_count > 1:
way.tags["highway"] = "cycleway"
way.tags["cycleway"] = "crossing"
way.tags["foot"] = "yes"
else:
way.tags["highway"] = "footway"
way.tags["footway"] = "crossing"
if fixme_count > 0:
_log.warning(f"could not resolve tags for {fixme_count} highway segments, added fixme tags")
_log.info("done")
# get_connected_roads()
#
def get_connected_roads(ways, gs, criteria_fun):
processed = set()
roads = []
for way in ways:
if way in processed or not criteria_fun(way):
continue
road = set()
new_segs = { way }
while len(new_segs) > 0:
road.update(new_segs)
prev_segs = new_segs
new_segs = set()
for w0 in prev_segs:
for w in gs.find_all_connecting_ways([w0.way[0], w0.way[-1]]):
if w not in road and criteria_fun(w):
new_segs.add(w)
processed.update(road)
roads.append(road)
return roads
# upgrade_unclassified_stumps_connected_to_residential
#
def upgrade_unclassified_stumps_connected_to_residential(way_db):
_log.info("Upgrading short unclassified stumps to residential (if connected to residential)...")
roads = get_connected_roads(way_db, way_db.gs, lambda way: way.tags.get("highway", None) == "unclassified")
upgrade_count = 0
for road in roads:
total_length = 0
has_connected_residential = False
for way in road:
length, _ = calc_way_length(way.way)
total_length += length
if total_length > 1000:
break
if not has_connected_residential:
for w in way_db.gs.find_all_connecting_ways([way.way[0], way.way[-1]]):
if w not in road and w.tags.get("highway", None) == "residential":
has_connected_residential = True
break
if total_length <= 1000 and has_connected_residential:
for way in road:
_log.debug(f"Upgraded {way.rlid} from unclassified to residential due to being short stump connected to residential.")
way.tags["highway"] = "residential"
upgrade_count += 1
_log.info(f"done ({upgrade_count} segments upgraded from unclassified to residential)")
# guess_upgrade_tracks
#
# There's not enough information for small roads (KLASS (7) 8, 9) so highway tag cannot be 100%
# correctly resolved, so manual adjustment will be required. The goal of this function is to
# make guesses that minimizes the need of manual adjustment
#
# Info not used and why:
# väghållare / road maintainer:
# - forest companies also maintain roads that we want to tag service
# - in many municipalities all väghållare is just the same for 8/9, "enskild" with no further
# information
# tillgänglighetsklass / availability class (NVDB-Tillganglighet A,B,C,D):
# - actual value has quirky contents in forestry network, many roads at a higher class than it
# should be
# - however if the value exists or not is a lead, value doesn't exist on driveways, unfortunately
# it's quite often missing also on some forestry roads and other tracks as well
#
#
# This function must be run after resolve_highways so the basic work is already done
#
def guess_upgrade_tracks(way_db):
def get_deadend_parent_ways(way):
ways1 = way_db.gs.find_all_connecting_ways(way.way[0])
ways2 = way_db.gs.find_all_connecting_ways(way.way[-1])
if len(ways1) == 1 and len(ways2) > 1:
ways2 = ways2.copy()
return [w for w in ways2 if w != way], way.way[-1]
if len(ways1) > 1 and len(ways2) == 1:
ways1 = ways1.copy()
return [w for w in ways1 if w != way], way.way[0]
return None, None
def way_has_a_gate(way):
for p in way.way:
if p in way_db.point_db:
nodes = way_db.point_db[p]
for node in nodes:
if node.tags.get("barrier", None) == "gate":
return True
return False
def is_deadend(way):
ways, _ = get_deadend_parent_ways(way)
return ways is not None
def angle_between_ways(cp, w1, w2, go_right=True):
p1 = None
p3 = None
for idx, p in enumerate(w1.way):
if p == cp:
if go_right:
if idx == len(w1.way)-1:
p1 = w1.way[idx-1]
else:
p1 = w1.way[idx+1]
else:
if idx == 0:
p1 = w1.way[idx+1]
else:
p1 = w1.way[idx-1]
break
for idx, p in enumerate(w2.way):
if p == cp:
if go_right:
if idx == len(w2.way)-1:
p3 = w2.way[idx-1]