-
Notifications
You must be signed in to change notification settings - Fork 0
/
structure.py
2827 lines (2166 loc) · 106 KB
/
structure.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Structural stuff, like deleting or renaming cols, and functions that query structure related properties
TODO: Migrate some of these functions to info, structure should be things that ALTER structure, and not query it. However, there would be some crossover risking circular references
"""
import enum as _enum
import os.path as _path
from copy import deepcopy as _deepcopy
from warnings import warn as _warn
import string
from arcpy.management import CreateFeatureclass, AddJoin, AddRelate, AddFields, AddField, DeleteField, AlterField, Delete, DomainToTable, TableToDomain # noqa Add other stuff as find it useful ...
from arcpy import Exists # noqa
# See https://pro.arcgis.com/en/pro-app/latest/tool-reference/data-management/create-domain.htm
from arcpy.management import CreateDomain, AlterDomain, AssignDomainToField, AddCodedValueToDomain, SortCodedValueDomain # noqa
from arcpy.management import CreateFeatureclass, CreateTable, AssignDefaultToField, CreateFileGDB # noqa
from arcpy.da import Describe # noqa this is the dict version of Describe, much easier to work with in python
from arcpy.conversion import ExcelToTable, ExportTable, ExportFeatures, TableToExcel, TableToGeodatabase, TableToDBASE, TableToSAS # noqa
import numpy as _np
import pandas as _pd
import fuckit as _fuckit
import arcpy as _arcpy
import funclite.iolib as _iolib
import funclite.baselib as _baselib
import funclite.stringslib as _stringslib
import docs.excel as _excel
import arcproapi.common as _common
# Functions, imported for convieniance
from arcproapi.common import get_row_count2 as rowcnt # noqa kept to not break code
from arcproapi.common import get_row_count2 as get_row_count2 # noqa
from arcproapi.common import get_row_count as get_row_count # noqa
from arcproapi.common import EnumFieldTypeText # noqa
from arcproapi.common import field_name_clean # noqa
from arcproapi.enums import EnumFieldProperties # noqa
import arcproapi.environ as _environ
import arcproapi.errors as _errors
import arcproapi.decs as _decs
_str2lst = lambda v: list(v) if isinstance(v, str) else v
_tolst = lambda v: v if isinstance(v, list) else list(v)
class FieldMap:
""" Instantiable class that represents field remaps
Case insensitive.
Examples:
Correct name of a field is "country_name", but "country", "countryname" and "COUNTRYNAME" are used
>>> CountryMap = FieldMap('country_name', ['countryname', 'country'])
"""
def __init__(self, field_name: str, field_alternates: list):
self.field_name = field_name
self.field_alternatives = _str2lst(field_alternates)
class FieldsRemapper:
"""
Pass instances of FieldMap and feature classes/tables and remap the names of those fields.
The field alternatives are ignored, i.e. no error is raised if no alternative are found
in any given fc/table. This is necessary to make the function flexible.
But, a precheck is done to check that the fields to rename to, do not exist.
Raises:
errors.FeatureClassOrTableNotFound: If any fnames did not exist
errors.StructFieldExists: If any fields to rename to exist in FieldMaps (i.e. attribute FieldMap.field_name)
"""
def __init__(self, fnames: (str, list[str]), FieldMaps: (FieldMap, list[FieldMap])):
self.fnames = list(map(_path.normpath, _str2lst(fnames)))
self.FieldMaps = _tolst(FieldMaps)
ok, d = self._validate_fnames()
if not ok:
raise _errors.FeatureClassOrTableNotFound('FieldsRemapper had invalid fnames %s' % d['bad'])
bad = self._remap(check_only=True)[1]
if bad:
raise _errors.StructFieldExists('Rename to fields found.:\n%s\n\nCorrect your FieldMaps' % bad)
# TODO: Test FieldsRemapper
def remap(self):
"""
Remap i.e. rename fields in feature classes/tables self.fnames according to the list of FieldMaps
Returns:
list of renamed fields for each layer and list of failed renames - IN THAT ORDER
Renames are keyed by the fc/table, with values as nested list of the "[old name, new name]"
{'lyr1': [['oldname', 'newname'], ...], 'lyr2': [['oldname', 'newname'], ...], ...},
{'lyr1': [['fieldold', fieldnew'], ...], ...}
Examples:
Successful rename
>>> fnames = ['C:/my.gdb/country_asia', 'C:/my.gdb/country_europe']
>>> Mappers = [FieldMap('population', ['popn', 'pop']), FieldMap('country', ['cnt', 'name'])]
>>> FieldsRemapper(fnames, Mappers).remap()
{'C:/my.gdb/country_asia': [['popn', 'population']], 'C:/my.gdb/country_europe': [['pop', 'population'], ['name', 'country']]}
"""
return self._remap(check_only=False)
def _remap(self, check_only: bool):
"""
Remap i.e. rename fields in feature classes/tables self.fnames according to the list of FieldMaps
Returns:
list of good and bad field renames and failed renames - IN THAT ORDER
Good renames are keyed by the fc/table, with values as nested list of the "[old name, new name]"
A bad field name only occurs where the new name already exists in a given fc/table
dict, dict:
{'lyr1': [['oldname', 'newname'], ...], 'lyr2': [['oldname', 'newname'], ...], ...},
{'lyr1': ['field1', 'field2']},
{'lyr1': [['fieldold', fieldnew'], ...], ...}
"""
bad = _baselib.DictList()
# A list of fields that already exist in lyr
# {'lyr1': ['field1', 'field2']}
good = _baselib.DictList()
# Renamed fields in lyrs
# {'lyr1': [['oldname', 'newname'], ...], 'lyr2': [['oldname', 'newname'], ...], ...}
failed = _baselib.DictList()
for fname in self.fnames:
FM: FieldMap
fields = list(map(str.lower, fc_fields_get(fname)))
# The order in this loop is important
# It is only an error if the proper name already exists in a given layer
# AND we were going to do the rename because an alternative name already exists
for FM in self.FieldMaps:
for altfld in FM.field_alternatives:
if field_exists(fname, altfld):
if FM.field_name.lower() in fields:
bad[fname] = FM.field_name
else:
good[fname] = [altfld, FM.field_name]
# get out of here, we havent asked for a rename
if check_only:
return good, bad, None
# do the rename
for fname, maps in good.items():
for old, new in maps:
try:
AlterField(fname, old, new, clear_field_alias=True)
except:
failed[fname] = [old, new]
return good, bad, failed
def _validate_fnames(self):
"""
Validate fnames and
Returns:
bool, dict: {'good':[..., ...], 'bad': ['...', .,,]}
"""
# Exposed as a class, as expect to expand functionality over time
class RelationshipMapper:
def __init__(self, gdb: str, key_suffix='id', ri_check: bool = True):
self.gdb = _path.normpath(gdb)
self._key_suffix = key_suffix
self._list_fk = [[], []]
self._list_pk = [[], []]
self._ri_check = ri_check
self._load()
self._create()
# TODO: Expand this - at the moment it does not for example cater for the use of OIDs as foreign keys, which is a common use case. Also support inclusions and exclusions, probably as a dict
def _load(self):
for fname in _baselib.list_flatten(list(gdb_tables_and_fcs_list(self.gdb, full_path=True))):
for fld in fields_get(fname):
if fld[len(fld) - len(self._key_suffix):].lower() == self._key_suffix.lower():
if _path.basename(fname).lower() == fld[0: len(fld) - len(self._key_suffix)].lower(): # this is the pk field
self._list_pk[0] += [fname]
self._list_pk[1] += [fld.lower()]
else: # fk field candidate
self._list_fk[0] += [fname]
self._list_fk[1] += [fld.lower()]
def _create(self, show_progress=True):
if show_progress:
PP = _iolib.PrintProgress(iter_=zip(self._list_pk[0], self._list_pk[1]), init_msg='\nCreating relationships in %s ... ' % self.gdb)
for fname, fld in zip(self._list_pk[0], self._list_pk[1]):
for n, v in enumerate(zip(self._list_fk[0], self._list_fk[1])):
if v[1] == fld.lower():
try:
gdb_rel_one_to_many_create(fname, fld, v[0], v[1], ri_check=self._ri_check)
except Exception as err:
print('\nFailed to add relationship [%s,%s,%s,%s].\nThe error was:%s\n' % (fname, fld, v[0], v[1], getattr(err, 'message', repr(err))))
if show_progress:
PP.increment() # noqa
def memory_lyr_get(workspace='in_memory') -> str:
""" Just get an 8 char string to use as name for temp layer.
Returns:
str: tmp layer pointer
Examples:
>>> memory_lyr_get()
'in_memory/arehrwfs
"""
return '%s/%s' % (workspace, _stringslib.rndstr(from_=string.ascii_lowercase))
def memory_lyr_copy_to(fname: str) -> str:
"""
Export a feature class to in_memory and return the
in_memory layer ref.
Args:
fname: fname
Returns:
Path to the created in_memory layer, which is a copy of fname fc
"""
tmp = memory_lyr_get()
ExportFeatures(_path.normpath(fname), tmp)
return tmp
def field_alter(fname: str, field_name: str, **kwargs) -> bool:
"""
Just delegates to alter field, but checks if it exists first.
Args:
fname (str): fname
field_name (str): field name
kwargs: kwargs passed to AlterField. Def: AlterField(new_field_name=None, new_field_alias=None, field_type=None, field_length=None, field_is_nullable=None, clear_field_alias=None)
Returns:
bool: True if altered, otherwise false
"""
field_alter.__doc__ += '\n\n*********************\n%s' % _arcpy.management.AlterField.__doc__
fname = _path.normpath(fname)
if field_exists(fname, field_name): return False
AlterField(fname, field_name, **kwargs)
return True
def field_oid(fname):
"""Return name of the object ID field in table table"""
fname = _path.normpath(fname)
return _arcpy.Describe(fname).OIDFieldName
def field_shp(fname) -> (str, None):
"""(str)->str
Return name of the Shape (Geometry) field in feature class fname
Args:
fname (str): path to feature class/table etc.
Returns:
str: Name of the shape/geometry field
None: If fname is not a feature class
"""
fname = _path.normpath(fname)
try:
D = _arcpy.Describe(fname).shapeFieldName
except AttributeError:
return None
return D
field_shape = field_shp # noqa
def fields_delete_not_in(fname, not_in):
"""
Delete all fields not in not_in
Args:
fname (str): path to feature class
not_in (iter): iterable of field names to keep
Examples:
>>> fields_delete_not_in('c:/lyr.shp', ('cola', 'colb'))
Notes:
ArcGISPro now supports this natively, see management.DeleteField
https://pro.arcgis.com/en/pro-app/latest/tool-reference/data-management/delete-field.htm
This method retained to not break code.
"""
not_in = [s.lower() for s in not_in]
fname = _path.normpath(fname)
flds = [fld.name for fld in _arcpy.ListFields(fname) if fld.name.lower() not in not_in and not fld.required]
_arcpy.DeleteField_management(fname, flds)
def fields_delete(fname, fields: (str, list[str], None) = None, where: (str, None) = None, show_progress: bool = False) -> (tuple[list[str]], None):
"""
Delete fields that are in the list "fields" OR that match "where".
** USE WITH EXTREME CARE. THIS CANNOT BE UNDONE!!! **
Args:
fname (str): Feature class or table
fields (str, list[str], None): list of field names. Cannot be used if where is specified. Accepts a string.
where (str, None): where, passed to ListFields to identify fields. This cannot be used with the fields argument.
show_progress (bool): Show progress
Raises:
ValueError: If where and fields are not None.
Returns:
tuple[list[str]]: A tuple of 2 lists, (success, failure). Returns None if no matches.
Notes:
Call arcpy.ListFields, passing the where argument.
Raises a python warning if the delete raises an error
"""
good = []
bad = []
all_flds = []
fname = _path.normpath(fname)
if where and fields:
raise ValueError('The "where" and "fields" argument cannot both be passed. Use one or the other')
if isinstance(fields, str): fields = [fields]
flds = _deepcopy(fields)
if not flds:
flds = []
if where:
flds = _arcpy.ListFields(fname, where)
else:
all_flds = field_list(fname)
if not flds:
return None # noqa
if show_progress:
PP = _iolib.PrintProgress(iter_=flds, init_msg='Deleting %s fields...' % len(flds))
for f in flds:
try:
if where:
DeleteField(fname, f)
good += [f]
else:
if f.lower() in map(str.lower, all_flds):
DeleteField(fname, f)
good += [f]
else:
bad += [f]
except Exception as e:
bad += [f]
_warn('Failed to delete field "%s". The error was:\n\n%s' % (f, e))
if show_progress:
PP.increment() # noqa
return good, bad
def fc_fields_not_required(fname: str, full_name: bool = True) -> list[str]:
"""
Get list of fields that are not required.
Args:
fname (str): fc
full_name (bool): Get the fully qualified name, otherwise just gets the basenames
Returns:
list[str]: list of fields that are not required, i.e. field.required is false
"""
def _f(s):
return _iolib.fixp(fname, s) if full_name else s
fname = _path.normpath(fname)
return [_f(fld.name) for fld in _arcpy.da.Describe(fname)['fields'] if not fld.required]
def fc_fields_required(fname: str, full_name: bool = True) -> list[str]:
"""
Get list of fields that are required.
Args:
fname (str): fc
full_name (bool): Get the fully qualified name, otherwise just gets the basenames
Returns:
list[str]: list of fields that are required, i.e. field.required is True
"""
def _f(s):
return _iolib.fixp(fname, s) if full_name else s
fname = _path.normpath(fname)
return [_f(fld.name) for fld in _arcpy.da.Describe(fname)['fields'] if fld.required]
def fc_fields_not_editable(fname: str, full_name: bool = True) -> list[str]:
"""
Get list of fields that are not required.
Args:
fname (str): fc
full_name (bool): Get the fully qualified name, otherwise just gets the basenames
Returns:
list[str]: list of fields that are not required, i.e. field.required is false
"""
def _f(s):
return _iolib.fixp(fname, s) if full_name else s
fname = _path.normpath(fname)
return [_f(fld.name) for fld in _arcpy.da.Describe(fname)['fields'] if not fld.editable]
def fc_fields_editable(fname: str, full_name: bool = True, exclude_shape: bool = False) -> list[str]:
"""
Get list of fields that are required.
Args:
fname (str): fc
full_name (bool): Get the fully qualified name, otherwise just gets the basenames
exclude_shape: Shape@ are editable, optionally exclude the shape field
Returns:
list[str]: list of fields that are required, i.e. field.required is True
"""
def _f(s):
return _iolib.fixp(fname, s) if full_name else s
fname = _path.normpath(fname)
if exclude_shape:
return [_f(fld.name) for fld in _arcpy.da.Describe(fname)['fields'] if fld.editable and fld.name.lower() != field_shape(fname).lower()]
return [_f(fld.name) for fld in _arcpy.da.Describe(fname)['fields'] if fld.editable]
def fc_fields_alias_clear(fname: str, where: str = '', fields: iter = None, show_progress: bool = False) -> list:
"""
Clear field aliases that match either the fields list or the where.
Args:
fname (str): feature/table name
where (str): where, passed to arcpy.ListFields. Use "*" for all fields
fields (iter): list of fields
show_progress (bool): Show progress
Raises:
ValueError: If fields AND where both evaluate to True (e.g. where='*' and fields=['myfield']
Returns:
list: field names with cleared aliases
Examples:
# Clear all aliases in countries
>>> fc_fields_alias_clear('C:/my.gdb/countries', where='*')
# Clear aliases in fields total_population and male_population
>>> fc_fields_alias_clear('C:/my.gdb/countries', fields=('total_population', 'male_population'))
"""
out = []
fname = _path.normpath(fname)
if where and fields:
raise ValueError('Pass "fields" or "where", not both.')
flds = _deepcopy(fields)
if not fields:
flds = _arcpy.ListFields(fname, where)
if show_progress:
PP = _iolib.PrintProgress(iter_=flds, init_msg='Resetting aliases....')
for f in flds:
AlterField(fname, f, clear_field_alias='CLEAR_ALIAS')
if show_progress:
PP.increment() # noqa
out += f
return out
def fc_delete(fname: str) -> bool:
"""Deletes a table or feature class.
This method is currently superflous - use arcpy.management.Delete. See note.
Returns:
bool: False if fname does not exist, True if fname exists and was deleted.
Notes:
arcpy 3.0.1 - arcpy.management.Delete appears to have changed and no longer raises an error if the layer does not exist.
It is unknown if this is a deliberate design decision or will be reverted at some later date.
"""
deletted = False
fname = _path.normpath(fname)
if _arcpy.Exists(fname):
Delete(fname)
deletted = True
return deletted
def fc_delete2(fname: str, err_on_not_exists: bool = False, data_type: str = None) -> None:
"""Deletes a table or feature class.
arcpy 3.0.1 - arcpy.management.Delete appears to have changed and no longer raises an error if the layer does not exist.
Args:
fname (str): feature class or table path
err_on_not_exists (bool): raise an error if featureclass/table does not exist
data_type (str): data type to delete, see https://pro.arcgis.com/en/pro-app/2.9/tool-reference/data-management/delete.htm
Raises:
arcpy.ExecuteError: Raised if err_on_no_field and the field does not exist
Other exceptions should raise an error as normal (which may also be arcpy.ExecuteError
Returns: None
Examples:
>>> fc_delete2('c:/my.gdb/this_does_not_exist', err_on_not_exists=False)
"""
fname = _path.normpath(fname)
try:
_arcpy.management.Delete(fname, data_type=data_type)
except _arcpy.ExecuteError as e:
if 'does not exist' in str(e):
if err_on_not_exists:
raise _arcpy.ExecuteError('%s does not exist' % fname) from e
else:
raise _arcpy.ExecuteError from e
def fc_field_types_get(fname, filterer=None) -> list:
"""Return list of column types of a table.
Args:
fname -- input table or table view
Optional:
filterer -- function, only fields where filterer returns True are listed
Example:
>>> types('c:\\foo\\bar.shp', lambda f: f.name.startswith('eggs')) # noqa
"""
fname = _path.normpath(fname)
flds = _arcpy.ListFields(fname)
if filterer is None: filterer = lambda a: True
return [f.type for f in flds if filterer(f)]
def fcs_delete(fnames, err_on_not_exists=False):
"""_arcpy.Delete_management(fname) if _arcpy.Exists(fname).
Args:
fnames (list, tuple): iterable of feature class or table path names
err_on_not_exists (bool): raise an error if field does not exist in fname
Raises: Raises an error if err_on_not_exists and an error occurs
Examples:
>>> fcs_delete(['c:/my.gdb/layer1', 'c:/my.gdb/layer2'])
"""
for fname in fnames:
fname = _path.normpath(fname)
try:
fc_delete2(fname, err_on_not_exists)
except Exception as e:
if err_on_not_exists:
raise e
# aide memoir declaration
domain_from_table = TableToDomain
def domain_from_excel(geodb: str, domain_name: str, xl_workbook: str, xl_sheet: str = '', xl_table: str = '', xl_range: str = ''):
"""
Create a domain in a geodatabase from an excel range.
The excel range can be a table, or a standard A1B1 type reference.
Args:
geodb: The geodatabase to add the domain to
domain_name: The name of the domain to create
xl_workbook: The workbook (.xlsx file)
xl_sheet: The worksheet, pass to improve efficiency when looking for a table or when defining with A1:B1 xl_range (range)
xl_table: Table (ListObject) name
xl_range: "A1:B1" type range definition
Raises:
ValueError: If xl_table and xl_range both evaluate to True
"""
if xl_table and xl_range:
raise ValueError('Passed an excel range and table. Use one or the other')
# TODO: Debug/test domain_from_excel
with _excel.ExcelAsDataFrame(xl_workbook, worksheet=xl_sheet, table=xl_table, range_=xl_range) as Excel:
df = Excel.df_lower.copy()
vals = _baselib.list_flatten(list(df.to_dict(orient='list').values()))
domain_create2(geodb, domain_name, codes=vals)
def domain_create(geodb: str, domain_name: str, codes: (list, tuple), descriptions: (list, tuple) = (), update_option='REPLACE'):
"""
Import as a domain into a geodatabase
Args:
geodb (str): The geodatabase
domain_name (str): Name of the domain
codes (list, tuple): List of the actual code values, can be int, float etc
descriptions (list, tuple): list of the descriptions, if evaluates to False then codes will also be used as the descriptions
update_option (str): Either 'APPEND' or 'REPLACE'. See https://pro.arcgis.com/en/pro-app/latest/tool-reference/data-management/table-to-domain.htm
Raises:
ValueError: If descriptions evaluates to True and len(descriptions) != len(codes)
Returns:
None
"""
if codes and descriptions:
if len(codes) != len(descriptions):
raise ValueError('descriptions were provided, but the length of the iterable does not match the number of codes')
geodb = _path.normpath(geodb)
if descriptions:
code_desc = ((x, y) for x, y in zip(codes, descriptions))
else:
code_desc = [(v, v) for v in codes]
array = _np.array(code_desc, dtype=[('code', 'S3'), ('value', 'S50')])
table = 'in_memory/table'
_arcpy.da.NumPyArrayToTable(array, table)
# NB: You have to close and reopon any active client sessions before this appears as at ArcGISPro 3.0.1. Refreshing the geodb doesnt even work.
_arcpy.management.TableToDomain(table, 'code', 'value', geodb, domain_name, domain_name, update_option=update_option) # noqa
def domain_create2(geodb: str, domain_name: str, codes: list, descs: list = None, **kwargs) -> None:
"""
Create the enum as a coded domain in the geodatabase gdb
Range domains not yet supported.
This is an enhancemoent over domain_create as it autodetects the field type. domain_create add as 'TEXT' field type.
Args:
geodb (str): The geodatabase
domain_name: the domain name
codes: list of codes
descs: list of descriptions, just sets descriptions to the corresponding code "if not descs"
kwargs:
Passed to arcpy.management.CreateDomain.
See https://pro.arcgis.com/en/pro-app/latest/tool-reference/data-management/create-domain.htm#
Raises:
ValueError: If descs and len(codes) != len(descs). i.e. we pass descriptions, but each code doesnt have a description, or vica-versa
Returns:
None
Notes:
Will undoubtedly error if the domain is assigned to a field.
Integer types have field type arcpy LONG, float types are set to arcpy FLOAT.
Code will need revising if these types are of insufficient size.
See https://pro.arcgis.com/en/pro-app/latest/tool-reference/data-management/create-domain.htm
"""
if not descs:
descs = codes
if len(descs) != len(codes):
raise ValueError('If "descs" is passed, its length must be the same as "codes"')
geodb = _path.normpath(geodb)
isint = all([_baselib.is_int(x) for x in codes])
isfloat = all([_baselib.is_float(x) for x in codes])
isdate = all([_baselib.is_date(x) for x in codes])
if isint:
vals = map(int, codes)
ft = 'LONG'
elif isfloat:
vals = map(float, codes)
ft = 'FLOAT'
elif isdate:
vals = map(_dateutil.parser.parse, codes) # noqa
ft = 'DATE'
else:
vals = map(str, codes)
ft = 'TEXT'
if not vals: return
with _fuckit:
_arcpy.management.DeleteDomain(geodb, domain_name)
# domain description is at the domain level and NOT the values for the domain - they are set in the for v in vals loop below
_arcpy.management.CreateDomain(geodb, domain_name=domain_name,
domain_description=kwargs['domain_description'] if kwargs.get('domain_description', None) else domain_name,
field_type=ft, domain_type='CODED')
for i, v in enumerate(vals):
_arcpy.management.AddCodedValueToDomain(geodb, domain_name, v, descs[i])
@_decs.environ_persist
def domains_assign(fname: str, domain_field_dict: dict[str: list[list, str]], error_on_failure: bool = False, show_progress: bool = False) -> dict[str:list[str]]:
"""
Assign domains to multiple fields in layer fname.
Args:
fname (str): The feature class/table
domain_field_dict (dict):
A dictionary of domain names, as already defined in the geodatabase, with the column name(s) as a list, also accepts the col name as a string.
The keys of dict can also be an enumeration, where the enumeration class name is the domain name.
error_on_failure: Raise an error on failure, otherwise errors can be detected in the returned dictionary, with item key "fail"
show_progress (bool): show progress
Returns:
dict[str:list[str]]: A dictionary of successes and failues {'success':[...], 'fail':[...]}
Notes:
Further work on supporting linking domains with python enums is anticipated. Hence the enum support for field keys.
Examples:
domain1, valid fields, domain2 - invalid
>>> domains_assign({'domain1': ['field11','field12'], 'domain2': 'field2_DOESNOTEXIST'}) # noqa
{'success': ['domain1:field1', 'domain1:field12'], 'fail': ['domain2:field2:DOESNOTEXIST']}
Now passing an enum
>>> class E(enum.Enum): a = 1 # noqa
>>> domains_assign({E: 'field1'})
{'success':'domain1:field1'], 'fail': [])
"""
# TODO: Debug/test domains_assign
fname = _path.normpath(fname)
_arcpy.env.workspace = _common.workspace_from_fname(fname, simple_gdb_test=True)
failed = []
success = []
if show_progress: PP = _iolib.PrintProgress(iter_=domain_field_dict.items(), init_msg='Setting domains ...') # noqa
for dname, cols in domain_field_dict.items():
if isinstance(dname, _enum.EnumMeta):
dname = dname.__name__
if isinstance(cols, str):
cols = [cols]
for col in cols:
if error_on_failure:
AssignDomainToField(fname, col, dname)
success += ['%s:%s' % (dname, col)]
else:
try:
AssignDomainToField(fname, col, dname)
success += ['%s:%s' % (dname, col)]
except Exception as e:
if 'schema lock' in str(e):
_warn('\nAssignment of domain enum "%s" to table/fc "%s" failed for field "%s". *** Schema Lock ***' % (dname, fname, col))
serr = '**schema lock**'
else:
serr = str(e)
if show_progress: print('\nError assigning domain "%s:%s".\n%s' % (dname, col, serr))
failed += ['%s:%s %s' % (dname, col, serr)]
if show_progress:
PP.increment() # noqa
return {'success': success, 'fail': failed}
def gdb_domains_as_dict(gdb: str) -> dict[str:dict[str:str]]:
"""
Get a dictionary of all domains in a geodatabase
Args:
gdb (str): gdb
Returns:
dict[str:dict[str:str]]: dictionary of form {domain_name:{coded value: description}, domain_name:{coded value: description}, ... NB: coded value is min and max for range domains. See example.
If no domains and empty dict is returned
Examples:
One coded value domain in the gdb
>>> gdb_domains_as_dict('C:/my.gdb')
{'ColoursDomain': {'blue': 'The colour blue', 'red': 'the colour red', ...}
One range value domain in gdb.
>>> gdb_domains_as_dict('C:/my.gdb')
{'RangeDomain': {'min': 0, 'max': 10)}
"""
out = {}
for domain in _arcpy.da.ListDomains(_path.normpath(gdb)):
out[domain.name] = {}
if domain.domainType == 'CodedValue':
for v, desc in domain.codedValues():
out[domain.name][v] = desc
elif domain.domainType == 'Range':
out[domain.name]['min'] = domain.range[0]
out[domain.name]['max'] = domain.range[1]
return out
domains_as_dict = gdb_domains_as_dict
def gdb_domain_exists(gdb: str, domain: str) -> bool:
"""Check if a domain exists in gdb by name
Case insensitive
Args:
gdb: the geodatabase (normpathed)
domain: the domain name
Returns:
bool: True if exists.
"""
return domain.lower() in map(str.lower, [d.name for d in _arcpy.da.ListDomains(_path.normpath(gdb))])
def cleanup(fname_list, verbose=False, **args):
"""Delete items in fname and return number of items that could not be deleted.
This function uses the dlt function, which in turn uses
_arcpy. Exists and _arcpy.management.Delete. The deletion is wrapped in a try
statement so failed deletions are skipped silently.
Required:
fname -- iterable of items to delete
Optional:
verbose -- suppress messages if False (default), otherwise print messages
**args -- keyword arguments 'timef' and 'log' for function 'msg'
Example:
>>> cleanup(['c:\\foo\\bar.shp', 'lyr', 'c:\\foo\\eggs.tif'])
"""
cnt = 0
for i in fname_list:
try:
deleted = fc_delete(i)
except:
deleted = False
if deleted:
m = "Cleanup deleted " + str(i)
else:
m = "Cleanup could not delete " + str(i)
_common.msg(m, args.get('timef', '%Y-%m-%d %H:%M:%S'), verbose, args.get('log', None))
return cnt
def fcs_list_all(gdb, wild: str = '*', ftype: str = 'All', rel: bool = False):
"""Return a list of all feature classes in a geodatabase.
if rel is True, only relative paths will be returned. If
false, the full path to each feature classs is returned
Relative path Example:
Utilities\Storm_Mh
Utilities\Storm_Cb
Transportation\Roads
Args:
gdb (str): Geodatabase containing feature classes to be listed
wild (str): wildcard for feature classes. Default is "*"
ftype (str): feature class type. Default is 'All'
Valid values:
Annotation - Only annotation feature classes are returned.
Arc - Only arc (or line) feature classes are returned.
Dimension - Only dimension feature classes are returned.
Edge - Only edge feature classes are returned.
Junction - Only junction feature classes are returned.
Label - Only label feature classes are returned.
Line - Only line (or arc) feature classes are returned.
Multipatch - Only multipatch feature classes are returned.
Node - Only node feature classes are returned.
Point - Only point feature classes are returned.
Polygon - Only polygon feature classes are returned.
Polyline - Only line (or arc) feature classes are returned.
Region - Only region feature classes are returned.
Route - Only route feature classes are returned.
Tic - Only tic feature classes are returned.
All - All datasets in the workspace. This is the default value.
Table - Only tables
rel (bool): Option to have relative paths. Default is false;
will include full paths unless rel is set to True
Returns:
List: List of all feature class paths
Notes:
Sets the workspace.
Examples:
>>> # Return relative paths for fc
>>> gdb_ = 'C:/TEMP/test.gdb'
>>> for fc in getFCPaths(gdb_, rel=True): # noqa
>>> pass
"""
# feature type (add all in case '' is returned
# from script tool
if not ftype:
ftype = 'All'
_arcpy.env.workspace = gdb
# loop through feature classes
feats = []
# Add top level fc's (not in feature data sets)
feats += _arcpy.ListFeatureClasses(wild, ftype)
# loop through feature datasets
for fd in _arcpy.ListDatasets('*', 'Feature'):
_arcpy.env.workspace = _environ.workspace_set(_path.normpath(_path.join(gdb, fd)))
feats += [_path.join(fd, fc) for fc in
_arcpy.ListFeatureClasses(wild, ftype)]
# return list of features, relative pathed or full pathed
if rel:
return sorted(feats)
else:
return sorted([_path.join(gdb, ft) for ft in feats])
def field_list_compare(fname: str, col_list: list, **kwargs):
"""Compare the field list from fname with col_list
returning the symetric difference as a dic
Args:
fname (str): feature class
col_list (list): list of col names to check
**kwargs (any): keyword args to pass to struct.fieldlist
Returns:
dict: {'a_notin_b':[..], 'a_and_b':[...], 'b_notin_a':[...]},
where "a" is the feature class cols and "b" is the cols from col_list
.. so in_feature_class, in_both, in_list
Examples:
>>> field_list_compare('c:/my.gdb/lyr', ['OBJECTID', 'colb', 'colc'], shape=True)
{'a_notin_b':['fname_col1'], 'a_and_b':['OBJECTID'], 'b_notin_a':['colb','colc']}
"""
dbcols = field_list(fname, **kwargs)
return _baselib.list_sym_diff(dbcols, col_list)
def gdb_field_generator(gdb: str, wild_card='*', field_type: str = 'All', as_objs: bool = False) -> tuple[str]:
"""
Yields a tuple of feature class and the field name for all fields in the geodatabase.
Args:
gdb (str): Path the geodatabase, probably will work with enterprise geodatabases, but currently untested
wild_card (str): Filter field names
field_type (str): Field type filter. IN ('All', 'BLOB', 'Date', 'Double', 'Geometry', 'GlobalID', 'GUID', 'Integer', 'OID', 'Raster', 'Single', 'SmallInteger', 'String')
as_objs (bool): Yield an ArcPy.Field object instead of a str
Notes:
Simply calls structure.table_field_generator, passing the arguments as-is.
Examples:
>>> for tbl, field_name in gdb_field_generator('C:/my.gdb')
>>> print(tbl, fname)
'C:/my.gdb/countries', 'country_name'
'C:/my.gdb/countries', 'populations'
"""
fcs, ts = gdb_tables_and_fcs_list(gdb, full_path=True)
for s in fcs + ts:
for fld in table_field_generator(s, wild_card=wild_card, field_type=field_type, as_objs=as_objs):
yield s, fld
def table_field_generator(fname: str, wild_card='*', field_type: str = 'All', as_objs: bool = False) -> str:
"""
Yields field names, or optionally a field instance
Args:
fname (str): The feature class or table
wild_card (str): Wildcard match
field_type (str): Field type filter. IN ('All', 'BLOB', 'Date', 'Double', 'Geometry', 'GlobalID', 'GUID', 'Integer', 'OID', 'Raster', 'Single', 'SmallInteger', 'String')
as_objs (bool): Yield an ArcPy.Field object instead of a str
Yields:
None: No matches on wild_card
str: Field name if as_obj = False
arcpy.Field: An instance of arcpy.Field if as_obj=True