-
Notifications
You must be signed in to change notification settings - Fork 1
/
osm.py
328 lines (301 loc) · 10.2 KB
/
osm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
"""Functions for working with OpenStreetMap data"""
import json
import os
import re
import sys
import psycopg2
from sources import util
DBNAME = "underfoot_osm"
DB_USER = "underfoot"
DB_PASSWORD = "underfoot"
WAYS_TABLE_NAME = "underfoot_ways"
NATURAL_WAYS_TABLE_NAME = "underfoot_natural_ways"
NATURAL_NODES_TABLE_NAME = "underfoot_natural_nodes"
PLACE_NODES_TABLE_NAME = "underfoot_place_nodes"
def create_database():
"""Create the OSM database"""
util.call_cmd(["dropdb", "--if-exists", DBNAME], check=True)
util.call_cmd(["createdb", DBNAME])
util.call_cmd([
"psql", "-d", DBNAME,
"-c", "CREATE EXTENSION postgis; CREATE EXTENSION hstore;"])
return psycopg2.connect(f"dbname={DBNAME}")
def database_connection(recreate=False):
"""Return a psycopg database connection or make the database"""
if recreate:
return create_database()
# Check to see if db exists
try:
return psycopg2.connect(f"dbname={DBNAME}")
except psycopg2.OperationalError:
return create_database()
def fetch_data(url, clean=False):
"""Fetch data from URL"""
filename = os.path.basename(url)
if os.path.isfile(filename) and not clean:
pass
else:
fifteen_mins = 15.0 * 60
util.call_cmd(["curl", "-o", filename, "--max-time", str(fifteen_mins), url], check=True)
return filename
def load_osm_from_pbf(data_path, pack=None):
"""Load OSM data from PBF export into a PostgreSQL database using"""
read_args = [
"import",
"-connection", f"postgis://{DB_USER}:{DB_PASSWORD}@localhost/{DBNAME}?prefix=NONE",
"-mapping", "imposm-mapping.yml",
"-read", data_path
]
# Get bounding box coordinates from the database... or maybe the source
if pack:
read_args += [
"-limitto", pack["geojson_path"]
]
write_args = [
"-write",
"-deployproduction",
"-overwritecache"
]
cmd = [os.path.join("bin", "imposm")] + read_args + write_args
# Load data from PBF into the database
util.call_cmd(cmd)
def is_osm_loaded():
"""Check is OSM data has been loaded into the database"""
con = database_connection()
# Check to see if table exists and has rows
cur1 = con.cursor()
osm_loaded = False
try:
cur1.execute("SELECT count(*) FROM ways")
# Bail if it has rows unless we're forcing it
row = cur1.fetchone()
if row[0] > 0:
print("ways table has rows. Use --clean to force a new import")
osm_loaded = True
except psycopg2.errors.UndefinedTable:
osm_loaded = False
con.close()
return osm_loaded
def load_ways_data(data_path, pack=None):
"""Load ways from OSM data"""
if not is_osm_loaded():
load_osm_from_pbf(data_path, pack)
util.run_sql(f"DROP TABLE IF EXISTS {WAYS_TABLE_NAME}", dbname=DBNAME)
util.run_sql(
f"""
CREATE TABLE {WAYS_TABLE_NAME} AS
SELECT
id,
COALESCE(
tags -> 'ref', tags -> 'tiger:name_base', tags -> 'name'
) AS name,
tags -> 'highway' AS highway,
linestring
FROM ways
WHERE
tags -> 'highway' IS NOT NULL
""",
dbname=DBNAME
)
def load_natural_ways_data(data_path, pack=None):
"""Load natural ways from OSM data"""
if not is_osm_loaded():
load_osm_from_pbf(data_path, pack)
util.run_sql(f"DROP TABLE IF EXISTS {NATURAL_WAYS_TABLE_NAME}", dbname=DBNAME)
# TODO think about how to add a column to control zoom level, maybe using
# the length of the diagonal of the bounding box
# ROUND(ST_Length(ST_Simplify(ST_ChaikinSmoothing(linestring), 0.001)::geography)) AS length_m,
util.run_sql(
f"""
CREATE TABLE {NATURAL_WAYS_TABLE_NAME} AS
SELECT
id,
COALESCE(
tags -> 'ref', tags -> 'tiger:name_base', tags -> 'name'
) AS name,
tags -> 'natural' AS "natural",
ROUND(st_length(st_boundingdiagonal(linestring))::numeric, 2) AS diag_deg,
ST_ChaikinSmoothing(ST_Simplify(linestring, 200), 3) AS linestring
FROM natural_ways
WHERE
tags -> 'natural' IS NOT NULL
AND tags -> 'highway' IS NULL
AND COALESCE(
tags -> 'ref', tags -> 'tiger:name_base', tags -> 'name'
) IS NOT NULL
""",
dbname=DBNAME
)
# Delete the natural lines that were intended to be polygons (or are empty)
util.run_sql(
f"""
DELETE FROM {NATURAL_WAYS_TABLE_NAME}
WHERE
"natural" IS NOT NULL
AND ST_StartPoint(linestring) = ST_EndPoint(linestring)
""",
dbname=DBNAME
)
def load_natural_nodes_data(data_path, pack=None):
"""Load natural nodes from OSM data"""
if not is_osm_loaded():
load_osm_from_pbf(data_path, pack)
util.run_sql(f"DROP TABLE IF EXISTS {NATURAL_NODES_TABLE_NAME}", dbname=DBNAME)
util.run_sql(
f"""
CREATE TABLE {NATURAL_NODES_TABLE_NAME} AS
SELECT
id,
COALESCE(
tags -> 'ref', tags -> 'tiger:name_base', tags -> 'name'
) AS name,
tags -> 'natural' AS natural,
tags -> 'ele' AS elevation_m,
tags -> 'intermittent' AS intermittent,
geom
FROM natural_nodes
WHERE
tags -> 'natural' IN ('peak', 'saddle', 'spring')
""",
dbname=DBNAME
)
def load_place_nodes_data(data_path, pack=None):
"""Load natural nodes from OSM data"""
if not is_osm_loaded():
load_osm_from_pbf(data_path, pack)
util.run_sql(f"DROP TABLE IF EXISTS {PLACE_NODES_TABLE_NAME}", dbname=DBNAME)
util.run_sql(
f"""
CREATE TABLE {PLACE_NODES_TABLE_NAME} AS
SELECT
id,
COALESCE(
name, tags -> 'name'
) AS name,
tags -> 'place' AS place,
tags -> 'population' AS population,
geom
FROM place_nodes
""",
dbname=DBNAME
)
def make_ways_mbtiles(path):
"""Export ways into the MBTiles using different zoom levels for different types"""
if os.path.exists(path):
os.remove(path)
gpkg_path = f"{util.basename_for_path(path)}.gpkg"
zooms = (
(3, 6, ('motorway',)),
(7, 10, ('motorway', 'primary', 'trunk')),
(11, 12, (
'motorway',
'primary',
'trunk',
'secondary',
'tertiary',
'motorway_link'
)),
(13, 13, ())
)
for idx, row in enumerate(zooms):
minzoom, maxzoom, types = row
cmd = ["ogr2ogr"]
if idx != 0:
cmd += ["-update"]
cmd += [
gpkg_path,
f"PG:dbname={DBNAME}",
WAYS_TABLE_NAME,
"-nln", f"{WAYS_TABLE_NAME}_{minzoom}_{maxzoom}"
]
if len(types):
types_list = ", ".join([f"'{t}'" for t in types])
cmd += ["-where", f"highway IN ({types_list})"]
util.call_cmd(cmd)
conf = {
f"{WAYS_TABLE_NAME}_{minzoom}_{maxzoom}": {
"target_name": WAYS_TABLE_NAME,
"minzoom": minzoom,
"maxzoom": maxzoom
} for minzoom, maxzoom, types in zooms}
cmd = f"""
ogr2ogr {path} {gpkg_path}
-dsco MAX_SIZE=5000000
-dsco MINZOOM={min([row[0] for row in zooms])}
-dsco MAXZOOM={max([row[0] for row in zooms])}
-dsco CONF='{json.dumps(conf)}'
"""
# Some weirdness: if you calll subprocess.run with a list like I usually
# do, it fails because it will pass the CONF arg like -dsco "CONF=blah"
# and ogr2ogr does not like that. It just silently ignores that dsco.
# Instead, I'm constructing a string version of the command and passing
# that with shell=True so it doesn't try to do any fancy string escaping
util.call_cmd(re.sub(r'\s+', " ", cmd).strip(), shell=True)
os.remove(gpkg_path)
def make_context_mbtiles(path):
"""Make context mbtiles"""
if os.path.exists(path):
os.remove(path)
gpkg_path = f"{util.basename_for_path(path)}.gpkg"
tables = [
NATURAL_NODES_TABLE_NAME,
NATURAL_WAYS_TABLE_NAME,
PLACE_NODES_TABLE_NAME
]
cmds = [
["ogr2ogr", gpkg_path, f"PG:dbname={DBNAME}", table, "-nln", table]
for table in tables
]
for idx, cmd in enumerate(cmds):
if idx > 0:
cmd += ["-update"]
util.call_cmd(cmd)
conf = {
table: {"target_name": table}
for table in tables
}
cmd = f"""
ogr2ogr {path} {gpkg_path}
-dsco MAX_SIZE=5000000
-dsco MINZOOM=7
-dsco MAXZOOM=14
-dsco CONF='{json.dumps(conf)}'
"""
util.call_cmd(re.sub(r'\s+', " ", cmd).strip(), shell=True)
os.remove(gpkg_path)
def make_ways(pbf_url, clean=False, pack=None, path="./ways.mbtiles"):
r"""Make an MBTiles files for OSM ways data given an OSM PBF export URL
Parameters
----------
pbf_url : str
The URL of a PBF export of OSM data
pack : dict
Pack object
clean : bool
Force
"""
# Bail if no PBF URL
if not pbf_url or len(pbf_url) == 0:
raise ValueError("You must specify a PBF URL")
filename = fetch_data(pbf_url, clean=clean)
if clean:
con = database_connection(recreate=True)
con.close()
load_ways_data(filename, pack=pack)
make_ways_mbtiles(path)
return path
def make_context(pbf_url, clean=False, pack=None, path="./context.mbtiles"):
"""Makes an mbtiles with contextual geographic info from OSM"""
if not pbf_url or len(pbf_url) == 0:
raise ValueError("You must specify a PBF URL")
filename = fetch_data(pbf_url, clean=clean)
if clean:
con = database_connection(recreate=True)
con.close()
load_natural_ways_data(filename, pack=pack)
load_natural_nodes_data(filename, pack=pack)
load_place_nodes_data(filename, pack=pack)
make_context_mbtiles(path)
if __name__ == "__main__":
PATH = make_ways(sys.argv[0])
print(f"Created mbtiles at {PATH}")