diff --git a/.travis.yml b/.travis.yml index 6eaec67d4..6ed642f26 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,14 +1,20 @@ +dist: trusty # jdk 8 not available on xenial language: java -jdk: -- oraclejdk8 +language: java +java: + - oraclejdk8 install: true sudo: false # Install mongoDB to perform persistence tests -services: mongodb +services: + - mongodb + - postgresql +addons: + postgresql: 9.6 cache: directories: - - "$HOME/.m2" - - "$HOME/.cache/yarn" + - $HOME/.m2 + - $HOME/.cache/yarn # Install semantic-release before_script: - yarn global add @conveyal/maven-semantic-release semantic-release@15 @@ -17,6 +23,8 @@ before_install: # set region in AWS config for S3 setup - mkdir ~/.aws && printf '%s\n' '[default]' 'aws_access_key_id=foo' 'aws_secret_access_key=bar' 'region=us-east-1' > ~/.aws/config - cp configurations/default/server.yml.tmp configurations/default/server.yml +# create database for tests +- psql -U postgres -c 'CREATE DATABASE catalogue;' script: # package jar - mvn package diff --git a/pom.xml b/pom.xml index 82b51df18..23d177cab 100644 --- a/pom.xml +++ b/pom.xml @@ -243,11 +243,22 @@ 2.1.0 - + + + junit + junit + 4.12 + test + + + com.conveyal gtfs-lib - 4.3.2 + 4.3.4 @@ -313,14 +324,6 @@ 19.2 - - - junit - junit - 4.12 - test - - com.bugsnag @@ -363,6 +366,16 @@ 2.14.0 test + + + net.sf.supercsv + super-csv + 2.4.0 + diff --git a/src/main/java/com/conveyal/datatools/common/status/MonitorableJob.java b/src/main/java/com/conveyal/datatools/common/status/MonitorableJob.java index b88c120ed..0d401b2f2 100644 --- a/src/main/java/com/conveyal/datatools/common/status/MonitorableJob.java +++ b/src/main/java/com/conveyal/datatools/common/status/MonitorableJob.java @@ -57,7 +57,7 @@ public enum JobType { EXPORT_SNAPSHOT_TO_GTFS, CONVERT_EDITOR_MAPDB_TO_SQL, VALIDATE_ALL_FEEDS, - MERGE_PROJECT_FEEDS + MERGE_FEED_VERSIONS } public MonitorableJob(String owner, String name, JobType type) { diff --git a/src/main/java/com/conveyal/datatools/manager/controllers/api/FeedVersionController.java b/src/main/java/com/conveyal/datatools/manager/controllers/api/FeedVersionController.java index 268bd08a9..a8be71617 100644 --- a/src/main/java/com/conveyal/datatools/manager/controllers/api/FeedVersionController.java +++ b/src/main/java/com/conveyal/datatools/manager/controllers/api/FeedVersionController.java @@ -1,8 +1,11 @@ package com.conveyal.datatools.manager.controllers.api; +import com.conveyal.datatools.common.utils.SparkUtils; import com.conveyal.datatools.manager.DataManager; import com.conveyal.datatools.manager.auth.Auth0UserProfile; import com.conveyal.datatools.manager.jobs.CreateFeedVersionFromSnapshotJob; +import com.conveyal.datatools.manager.jobs.MergeFeedsJob; +import com.conveyal.datatools.manager.jobs.MergeFeedsType; import com.conveyal.datatools.manager.jobs.ProcessSingleFeedJob; import com.conveyal.datatools.manager.models.FeedDownloadToken; import com.conveyal.datatools.manager.models.FeedSource; @@ -13,6 +16,7 @@ import com.conveyal.datatools.manager.persistence.Persistence; import com.conveyal.datatools.manager.utils.HashUtils; import com.conveyal.datatools.manager.utils.json.JsonManager; + import com.fasterxml.jackson.databind.JsonNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,6 +27,8 @@ import java.io.File; import java.util.Collection; import java.util.Date; +import java.util.HashSet; +import java.util.Set; import static com.conveyal.datatools.common.utils.S3Utils.downloadFromS3; import static com.conveyal.datatools.common.utils.SparkUtils.copyRequestStreamIntoFile; @@ -30,6 +36,7 @@ import static com.conveyal.datatools.common.utils.SparkUtils.formatJobMessage; import static com.conveyal.datatools.common.utils.SparkUtils.logMessageAndHalt; import static com.conveyal.datatools.manager.controllers.api.FeedSourceController.checkFeedSourcePermissions; +import static com.conveyal.datatools.manager.jobs.MergeFeedsType.REGIONAL; import static spark.Spark.delete; import static spark.Spark.get; import static spark.Spark.post; @@ -262,6 +269,52 @@ private static FeedVersion publishToExternalResource (Request req, Response res) } } + /** + * HTTP controller that handles merging multiple feed versions for a given feed source, with version IDs specified + * in a comma-separated string in the feedVersionIds query parameter and merge type specified in mergeType query + * parameter. NOTE: REGIONAL merge type should only be handled through {@link ProjectController#mergeProjectFeeds(Request, Response)}. + */ + private static String mergeFeedVersions(Request req, Response res) { + String[] versionIds = req.queryParams("feedVersionIds").split(","); + // Try to parse merge type (null or bad value throws IllegalArgumentException). + MergeFeedsType mergeType; + try { + mergeType = MergeFeedsType.valueOf(req.queryParams("mergeType")); + if (mergeType.equals(REGIONAL)) { + throw new IllegalArgumentException("Regional merge type is not permitted for this endpoint."); + } + } catch (IllegalArgumentException e) { + logMessageAndHalt(req, 400, "Must provide valid merge type.", e); + return null; + } + // Collect versions to merge (must belong to same feed source). + Set versions = new HashSet<>(); + String feedSourceId = null; + for (String id : versionIds) { + FeedVersion v = Persistence.feedVersions.getById(id); + if (v == null) { + logMessageAndHalt(req, + 400, + String.format("Must provide valid version ID. (No version exists for id=%s.)", id) + ); + } + // Store feed source id and check other versions for matching. + if (feedSourceId == null) feedSourceId = v.feedSourceId; + else if (!v.feedSourceId.equals(feedSourceId)) { + logMessageAndHalt(req, 400, "Cannot merge versions with different parent feed sources."); + } + versions.add(v); + } + if (versionIds.length != 2) { + logMessageAndHalt(req, 400, "Merging more than two versions is not currently supported."); + } + // Kick off merge feeds job. + Auth0UserProfile userProfile = req.attribute("user"); + MergeFeedsJob mergeFeedsJob = new MergeFeedsJob(userProfile.getUser_id(), versions, "merged", mergeType); + DataManager.heavyExecutor.execute(mergeFeedsJob); + return SparkUtils.formatJobMessage(mergeFeedsJob.jobId, "Merging feed versions..."); + } + /** * Download locally stored feed version with token supplied by this application. This method is only used when * useS3 is set to false. Otherwise, a direct download from s3 should be used. @@ -300,6 +353,7 @@ public static void register (String apiPrefix) { post(apiPrefix + "secure/feedversion", FeedVersionController::createFeedVersionViaUpload, json::write); post(apiPrefix + "secure/feedversion/fromsnapshot", FeedVersionController::createFeedVersionFromSnapshot, json::write); put(apiPrefix + "secure/feedversion/:id/rename", FeedVersionController::renameFeedVersion, json::write); + put(apiPrefix + "secure/feedversion/merge", FeedVersionController::mergeFeedVersions, json::write); post(apiPrefix + "secure/feedversion/:id/publish", FeedVersionController::publishToExternalResource, json::write); delete(apiPrefix + "secure/feedversion/:id", FeedVersionController::deleteFeedVersion, json::write); diff --git a/src/main/java/com/conveyal/datatools/manager/controllers/api/ProjectController.java b/src/main/java/com/conveyal/datatools/manager/controllers/api/ProjectController.java index c5caba46f..cc4db1c1c 100644 --- a/src/main/java/com/conveyal/datatools/manager/controllers/api/ProjectController.java +++ b/src/main/java/com/conveyal/datatools/manager/controllers/api/ProjectController.java @@ -5,8 +5,9 @@ import com.conveyal.datatools.manager.auth.Auth0UserProfile; import com.conveyal.datatools.manager.jobs.FetchProjectFeedsJob; import com.conveyal.datatools.manager.jobs.MakePublicJob; -import com.conveyal.datatools.manager.jobs.MergeProjectFeedsJob; +import com.conveyal.datatools.manager.jobs.MergeFeedsJob; import com.conveyal.datatools.manager.models.FeedDownloadToken; +import com.conveyal.datatools.manager.models.FeedSource; import com.conveyal.datatools.manager.models.FeedVersion; import com.conveyal.datatools.manager.models.JsonViews; import com.conveyal.datatools.manager.models.Project; @@ -20,6 +21,8 @@ import spark.Response; import java.util.Collection; +import java.util.HashSet; +import java.util.Set; import java.util.stream.Collectors; import static com.conveyal.datatools.common.utils.S3Utils.downloadFromS3; @@ -27,6 +30,7 @@ import static com.conveyal.datatools.common.utils.SparkUtils.formatJobMessage; import static com.conveyal.datatools.common.utils.SparkUtils.logMessageAndHalt; import static com.conveyal.datatools.manager.DataManager.publicPath; +import static com.conveyal.datatools.manager.jobs.MergeFeedsType.REGIONAL; import static spark.Spark.delete; import static spark.Spark.get; import static spark.Spark.post; @@ -216,14 +220,28 @@ private static Project checkProjectPermissions(Request req, Project project, Str * to getFeedDownloadCredentials with the project ID to obtain either temporary S3 credentials or a download token * (depending on application configuration "application.data.use_s3_storage") to download the zip file. */ - private static String downloadMergedFeed(Request req, Response res) { + static String mergeProjectFeeds(Request req, Response res) { Project project = requestProjectById(req, "view"); Auth0UserProfile userProfile = req.attribute("user"); // TODO: make this an authenticated call? - MergeProjectFeedsJob mergeProjectFeedsJob = new MergeProjectFeedsJob(project, userProfile.getUser_id()); - DataManager.heavyExecutor.execute(mergeProjectFeedsJob); + Set feedVersions = new HashSet<>(); + // Get latest version for each feed source in project + Collection feedSources = project.retrieveProjectFeedSources(); + for (FeedSource fs : feedSources) { + // check if feed version exists + FeedVersion version = fs.retrieveLatest(); + if (version == null) { + LOG.warn("Skipping {} because it has no feed versions", fs.name); + continue; + } + // modify feed version to use prepended feed id + LOG.info("Adding {} feed to merged zip", fs.name); + feedVersions.add(version); + } + MergeFeedsJob mergeFeedsJob = new MergeFeedsJob(userProfile.getUser_id(), feedVersions, project.id, REGIONAL); + DataManager.heavyExecutor.execute(mergeFeedsJob); // Return job ID to requester for monitoring job status. - return formatJobMessage(mergeProjectFeedsJob.jobId, "Merge operation is processing."); + return formatJobMessage(mergeFeedsJob.jobId, "Merge operation is processing."); } /** @@ -310,7 +328,7 @@ public static void register (String apiPrefix) { post(apiPrefix + "secure/project/:id/fetch", ProjectController::fetch, json::write); post(apiPrefix + "secure/project/:id/deployPublic", ProjectController::publishPublicFeeds, json::write); - get(apiPrefix + "secure/project/:id/download", ProjectController::downloadMergedFeed); + get(apiPrefix + "secure/project/:id/download", ProjectController::mergeProjectFeeds); get(apiPrefix + "secure/project/:id/downloadtoken", ProjectController::getFeedDownloadCredentials, json::write); get(apiPrefix + "public/project/:id", ProjectController::getProject, json::write); diff --git a/src/main/java/com/conveyal/datatools/manager/gtfsplus/CalendarAttribute.java b/src/main/java/com/conveyal/datatools/manager/gtfsplus/CalendarAttribute.java new file mode 100644 index 000000000..10cea50c0 --- /dev/null +++ b/src/main/java/com/conveyal/datatools/manager/gtfsplus/CalendarAttribute.java @@ -0,0 +1,20 @@ +package com.conveyal.datatools.manager.gtfsplus; + +import com.conveyal.gtfs.model.Entity; + +import javax.naming.OperationNotSupportedException; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class CalendarAttribute extends Entity { + + private static final long serialVersionUID = 1L; + + public String service_id; + public String service_description; + + @Override public void setStatementParameters(PreparedStatement statement, boolean setDefaultId) { + throw new UnsupportedOperationException( + "Cannot call setStatementParameters because loading a GTFS+ table into RDBMS is unsupported."); + } +} diff --git a/src/main/java/com/conveyal/datatools/manager/gtfsplus/Direction.java b/src/main/java/com/conveyal/datatools/manager/gtfsplus/Direction.java new file mode 100644 index 000000000..4d618a12a --- /dev/null +++ b/src/main/java/com/conveyal/datatools/manager/gtfsplus/Direction.java @@ -0,0 +1,22 @@ +package com.conveyal.datatools.manager.gtfsplus; + +import com.conveyal.gtfs.model.Entity; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class Direction extends Entity { + + private static final long serialVersionUID = 1L; + + public String route_id; + public int direction_id; + public String direction; + + + @Override + public void setStatementParameters(PreparedStatement statement, boolean setDefaultId) throws SQLException { + throw new UnsupportedOperationException( + "Cannot call setStatementParameters because loading a GTFS+ table into RDBMS is unsupported."); + } +} diff --git a/src/main/java/com/conveyal/datatools/manager/gtfsplus/FareRiderCategory.java b/src/main/java/com/conveyal/datatools/manager/gtfsplus/FareRiderCategory.java new file mode 100644 index 000000000..6d759c062 --- /dev/null +++ b/src/main/java/com/conveyal/datatools/manager/gtfsplus/FareRiderCategory.java @@ -0,0 +1,24 @@ +package com.conveyal.datatools.manager.gtfsplus; + +import com.conveyal.gtfs.model.Entity; + +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.time.LocalDate; + +public class FareRiderCategory extends Entity { + + private static final long serialVersionUID = 1L; + + public String fare_id; + public int rider_category_id; + public double price; + public LocalDate expiration_date; + public LocalDate commencement_date; + + @Override + public void setStatementParameters(PreparedStatement statement, boolean setDefaultId) throws SQLException { + throw new UnsupportedOperationException( + "Cannot call setStatementParameters because loading a GTFS+ table into RDBMS is unsupported."); + } +} diff --git a/src/main/java/com/conveyal/datatools/manager/gtfsplus/FareZoneAttribute.java b/src/main/java/com/conveyal/datatools/manager/gtfsplus/FareZoneAttribute.java new file mode 100644 index 000000000..099f81b33 --- /dev/null +++ b/src/main/java/com/conveyal/datatools/manager/gtfsplus/FareZoneAttribute.java @@ -0,0 +1,20 @@ +package com.conveyal.datatools.manager.gtfsplus; + +import com.conveyal.gtfs.model.Entity; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class FareZoneAttribute extends Entity { + + private static final long serialVersionUID = 1L; + + public String zone_id; + public String zone_name; + + @Override + public void setStatementParameters(PreparedStatement statement, boolean setDefaultId) throws SQLException { + throw new UnsupportedOperationException( + "Cannot call setStatementParameters because loading a GTFS+ table into RDBMS is unsupported."); + } +} diff --git a/src/main/java/com/conveyal/datatools/manager/gtfsplus/GtfsPlusTable.java b/src/main/java/com/conveyal/datatools/manager/gtfsplus/GtfsPlusTable.java new file mode 100644 index 000000000..2255e36d6 --- /dev/null +++ b/src/main/java/com/conveyal/datatools/manager/gtfsplus/GtfsPlusTable.java @@ -0,0 +1,97 @@ +package com.conveyal.datatools.manager.gtfsplus; + +import com.conveyal.gtfs.loader.DateField; +import com.conveyal.gtfs.loader.DoubleField; +import com.conveyal.gtfs.loader.IntegerField; +import com.conveyal.gtfs.loader.ShortField; +import com.conveyal.gtfs.loader.StringField; +import com.conveyal.gtfs.loader.Table; + +import static com.conveyal.gtfs.loader.Requirement.OPTIONAL; +import static com.conveyal.gtfs.loader.Requirement.PROPRIETARY; +import static com.conveyal.gtfs.loader.Requirement.REQUIRED; + +/** + * This class contains GTFS+ table definitions that are based on gtfs-lib's {@link Table} constants. + * Currently, these are only used when operating on tables being merged within + * {@link com.conveyal.datatools.manager.jobs.MergeFeedsJob}. The definition of these tables can be + * found at https://www.transitwiki.org/TransitWiki/images/e/e7/GTFS%2B_Additional_Files_Format_Ver_1.7.pdf. + */ +public class GtfsPlusTable { + public static final Table REALTIME_ROUTES = new Table("realtime_routes", RealtimeRoute.class, PROPRIETARY, + new StringField("route_id", REQUIRED).isReferenceTo(Table.ROUTES), + new ShortField("realtime_enabled", REQUIRED, 1), + new StringField("realtime_routename", REQUIRED), + new StringField("realtime_routecode", REQUIRED) + ); + + public static final Table REALTIME_STOPS = new Table("realtime_stops", RealtimeStop.class, PROPRIETARY, + new StringField("trip_id", REQUIRED).isReferenceTo(Table.TRIPS), + new StringField("stop_id", REQUIRED).isReferenceTo(Table.STOPS), + new StringField("realtime_stop_id", REQUIRED) + ).keyFieldIsNotUnique(); + + public static final Table DIRECTIONS = new Table("directions", Direction.class, PROPRIETARY, + new StringField("route_id", REQUIRED).isReferenceTo(Table.ROUTES), + new ShortField("direction_id", REQUIRED, 1), + new StringField("direction", REQUIRED) + ).keyFieldIsNotUnique(); + + public static final Table REALTIME_TRIPS = new Table("realtime_trips", RealtimeTrip.class, PROPRIETARY, + new StringField("trip_id", REQUIRED).isReferenceTo(Table.TRIPS), + new StringField("realtime_trip_id", REQUIRED) + ); + + public static final Table STOP_ATTRIBUTES = new Table("stop_attributes", StopAttribute.class, PROPRIETARY, + new StringField("stop_id", REQUIRED).isReferenceTo(Table.STOPS), + new ShortField("accessibility_id", REQUIRED, 8), + new StringField("cardinal_direction", OPTIONAL), + new StringField("relative_position", OPTIONAL), + new StringField("stop_city", REQUIRED) + ); + + public static final Table TIMEPOINTS = new Table("timepoints", TimePoint.class, PROPRIETARY, + new StringField("trip_id", REQUIRED).isReferenceTo(Table.TRIPS), + new StringField("stop_id", REQUIRED).isReferenceTo(Table.STOPS) + ).keyFieldIsNotUnique(); + + public static final Table RIDER_CATEGORIES = new Table("rider_categories", RiderCategory.class, PROPRIETARY, + new IntegerField("rider_category_id", REQUIRED, 1, 25), + new StringField("rider_category_description", REQUIRED) + ); + + public static final Table FARE_RIDER_CATEGORIES = new Table("fare_rider_categories", FareRiderCategory.class, PROPRIETARY, + new StringField("fare_id", REQUIRED), + new IntegerField("rider_category_id", REQUIRED, 2, 25).isReferenceTo(RIDER_CATEGORIES), + new DoubleField("price", REQUIRED, 0, Double.MAX_VALUE, 2), + new DateField("expiration_date", OPTIONAL), + new DateField("commencement_date", OPTIONAL) + ).keyFieldIsNotUnique(); + + public static final Table CALENDAR_ATTRIBUTES = new Table("calendar_attributes", CalendarAttribute.class, PROPRIETARY, + new StringField("service_id", REQUIRED).isReferenceTo(Table.CALENDAR), + new StringField("service_description", REQUIRED) + ); + + public static final Table FAREZONE_ATTRIBUTES = new Table("farezone_attributes", FareZoneAttribute.class, PROPRIETARY, + new StringField("zone_id", REQUIRED), + new StringField("zone_name", REQUIRED) + ); + + /** + * List of tables in the order such that internal references can be appropriately checked as + * tables are loaded/encountered. + */ + public static final Table[] tables = new Table[] { + REALTIME_ROUTES, + REALTIME_STOPS, + REALTIME_TRIPS, + DIRECTIONS, + STOP_ATTRIBUTES, + TIMEPOINTS, + RIDER_CATEGORIES, + FARE_RIDER_CATEGORIES, + CALENDAR_ATTRIBUTES, + FAREZONE_ATTRIBUTES + }; +} diff --git a/src/main/java/com/conveyal/datatools/manager/gtfsplus/RealtimeRoute.java b/src/main/java/com/conveyal/datatools/manager/gtfsplus/RealtimeRoute.java new file mode 100644 index 000000000..e6c996aa5 --- /dev/null +++ b/src/main/java/com/conveyal/datatools/manager/gtfsplus/RealtimeRoute.java @@ -0,0 +1,21 @@ +package com.conveyal.datatools.manager.gtfsplus; + +import com.conveyal.gtfs.model.Entity; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class RealtimeRoute extends Entity { + private static final long serialVersionUID = 1L; + + public String route_id; + public int realtime_enabled; + public String realtime_routename; + public String realtime_routecode; + + @Override + public void setStatementParameters(PreparedStatement statement, boolean setDefaultId) throws SQLException { + throw new UnsupportedOperationException( + "Cannot call setStatementParameters because loading a GTFS+ table into RDBMS is unsupported."); + } +} diff --git a/src/main/java/com/conveyal/datatools/manager/gtfsplus/RealtimeStop.java b/src/main/java/com/conveyal/datatools/manager/gtfsplus/RealtimeStop.java new file mode 100644 index 000000000..51a9f3551 --- /dev/null +++ b/src/main/java/com/conveyal/datatools/manager/gtfsplus/RealtimeStop.java @@ -0,0 +1,21 @@ +package com.conveyal.datatools.manager.gtfsplus; + +import com.conveyal.gtfs.model.Entity; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class RealtimeStop extends Entity { + + private static final long serialVersionUID = 1L; + + public String trip_id; + public String stop_id; + public String realtime_stop_id; + + @Override + public void setStatementParameters(PreparedStatement statement, boolean setDefaultId) throws SQLException { + throw new UnsupportedOperationException( + "Cannot call setStatementParameters because loading a GTFS+ table into RDBMS is unsupported."); + } +} diff --git a/src/main/java/com/conveyal/datatools/manager/gtfsplus/RealtimeTrip.java b/src/main/java/com/conveyal/datatools/manager/gtfsplus/RealtimeTrip.java new file mode 100644 index 000000000..133990209 --- /dev/null +++ b/src/main/java/com/conveyal/datatools/manager/gtfsplus/RealtimeTrip.java @@ -0,0 +1,20 @@ +package com.conveyal.datatools.manager.gtfsplus; + +import com.conveyal.gtfs.model.Entity; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class RealtimeTrip extends Entity { + + private static final long serialVersionUID = 1L; + + public String trip_id; + public String realtime_trip_id; + + @Override + public void setStatementParameters(PreparedStatement statement, boolean setDefaultId) throws SQLException { + throw new UnsupportedOperationException( + "Cannot call setStatementParameters because loading a GTFS+ table into RDBMS is unsupported."); + } +} diff --git a/src/main/java/com/conveyal/datatools/manager/gtfsplus/RiderCategory.java b/src/main/java/com/conveyal/datatools/manager/gtfsplus/RiderCategory.java new file mode 100644 index 000000000..82442b13b --- /dev/null +++ b/src/main/java/com/conveyal/datatools/manager/gtfsplus/RiderCategory.java @@ -0,0 +1,20 @@ +package com.conveyal.datatools.manager.gtfsplus; + +import com.conveyal.gtfs.model.Entity; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class RiderCategory extends Entity { + + private static final long serialVersionUID = 1L; + + public int rider_category_id; + public String rider_category_description; + + @Override + public void setStatementParameters(PreparedStatement statement, boolean setDefaultId) throws SQLException { + throw new UnsupportedOperationException( + "Cannot call setStatementParameters because loading a GTFS+ table into RDBMS is unsupported."); + } +} diff --git a/src/main/java/com/conveyal/datatools/manager/gtfsplus/StopAttribute.java b/src/main/java/com/conveyal/datatools/manager/gtfsplus/StopAttribute.java new file mode 100644 index 000000000..e2c0ba8f8 --- /dev/null +++ b/src/main/java/com/conveyal/datatools/manager/gtfsplus/StopAttribute.java @@ -0,0 +1,23 @@ +package com.conveyal.datatools.manager.gtfsplus; + +import com.conveyal.gtfs.model.Entity; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class StopAttribute extends Entity { + + private static final long serialVersionUID = 1L; + + public String stop_id; + public int accessibility_id; + public String cardinal_direction; + public String relative_position; + public String stop_city; + + @Override + public void setStatementParameters(PreparedStatement statement, boolean setDefaultId) throws SQLException { + throw new UnsupportedOperationException( + "Cannot call setStatementParameters because loading a GTFS+ table into RDBMS is unsupported."); + } +} diff --git a/src/main/java/com/conveyal/datatools/manager/gtfsplus/TimePoint.java b/src/main/java/com/conveyal/datatools/manager/gtfsplus/TimePoint.java new file mode 100644 index 000000000..458ba86e3 --- /dev/null +++ b/src/main/java/com/conveyal/datatools/manager/gtfsplus/TimePoint.java @@ -0,0 +1,20 @@ +package com.conveyal.datatools.manager.gtfsplus; + +import com.conveyal.gtfs.model.Entity; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class TimePoint extends Entity { + + private static final long serialVersionUID = 1L; + + public String trip_id; + public String stop_id; + + @Override + public void setStatementParameters(PreparedStatement statement, boolean setDefaultId) throws SQLException { + throw new UnsupportedOperationException( + "Cannot call setStatementParameters because loading a GTFS+ table into RDBMS is unsupported."); + } +} diff --git a/src/main/java/com/conveyal/datatools/manager/gtfsplus/package-info.java b/src/main/java/com/conveyal/datatools/manager/gtfsplus/package-info.java new file mode 100644 index 000000000..03845bbea --- /dev/null +++ b/src/main/java/com/conveyal/datatools/manager/gtfsplus/package-info.java @@ -0,0 +1,16 @@ +/** + * This package contains classes that correspond to those found for GTFS entity types in + * {@link com.conveyal.gtfs.model}, but for GTFS+ entity types. It also contains + * {@link com.conveyal.datatools.manager.gtfsplus.GtfsPlusTable}, which extends the + * {@link com.conveyal.gtfs.loader.Table} in order to define a table specification for this set of + * extension tables. + * + * Note: these classes are primarily used for the MTC merge type in + * {@link com.conveyal.datatools.manager.jobs.MergeFeedsJob}. There may be an opportunity to also use + * these classes in the GTFS+ validation code path found in + * {@link com.conveyal.datatools.manager.controllers.api.GtfsPlusController}; however, + * TODO a way to define an enum set for string field values would need to first be added to support + * fields such as {@link com.conveyal.datatools.manager.gtfsplus.StopAttribute#cardinal_direction}. + */ +package com.conveyal.datatools.manager.gtfsplus; + diff --git a/src/main/java/com/conveyal/datatools/manager/jobs/MergeFeedsJob.java b/src/main/java/com/conveyal/datatools/manager/jobs/MergeFeedsJob.java new file mode 100644 index 000000000..5b3d684cf --- /dev/null +++ b/src/main/java/com/conveyal/datatools/manager/jobs/MergeFeedsJob.java @@ -0,0 +1,822 @@ +package com.conveyal.datatools.manager.jobs; + +import com.conveyal.datatools.common.status.MonitorableJob; +import com.conveyal.datatools.manager.DataManager; +import com.conveyal.datatools.manager.gtfsplus.GtfsPlusTable; +import com.conveyal.datatools.manager.models.FeedSource; +import com.conveyal.datatools.manager.models.FeedVersion; +import com.conveyal.datatools.manager.persistence.FeedStore; +import com.conveyal.gtfs.error.NewGTFSError; +import com.conveyal.gtfs.error.NewGTFSErrorType; +import com.conveyal.gtfs.loader.Field; +import com.conveyal.gtfs.loader.ReferenceTracker; +import com.conveyal.gtfs.loader.Table; +import com.csvreader.CsvReader; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.supercsv.io.CsvListWriter; +import org.supercsv.prefs.CsvPreference; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.time.LocalDate; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; +import java.util.zip.ZipOutputStream; + +import static com.conveyal.datatools.manager.jobs.MergeFeedsType.MTC; +import static com.conveyal.datatools.manager.jobs.MergeFeedsType.REGIONAL; +import static com.conveyal.datatools.manager.utils.StringUtils.getCleanName; +import static com.conveyal.gtfs.loader.DateField.GTFS_DATE_FORMATTER; +import static com.conveyal.gtfs.loader.Field.getFieldIndex; + +/** + * This job handles merging two or more feed versions according to logic specific to the specified merge type. + * The current merge types handled here are: + * - {@link MergeFeedsType#REGIONAL}: this is essentially a "dumb" merge. For each feed version, each primary key is + * scoped so that there is no possibility that it will conflict with other IDs + * found in any other feed version. Note: There is absolutely no attempt to merge + * entities based on either expected shared IDs or entity location (e.g., stop + * coordinates). + * - {@link MergeFeedsType#MTC}: this strategy is defined in detail at https://github.com/conveyal/datatools-server/issues/185, + * but in essence, this strategy attempts to merge a current and future feed into + * a combined file. For certain entities (specifically stops and routes) it uses + * alternate fields as primary keys (stop_code and route_short_name) if they are + * available. There is some complexity related to this in {@link #constructMergedTable(Table, List, ZipOutputStream)}. + * Another defining characteristic is to prefer entities defined in the "future" + * file if there are matching entities in the current file. + * Future merge strategies could be added here. For example, some potential customers have mentioned a desire to + * prefer entities from the "current" version, so that entities edited in Data Tools would override the values found + * in the "future" file, which may have limited data attributes due to being exported from scheduling software with + * limited GTFS support. + * + * Reproduced from https://github.com/conveyal/datatools-server/issues/185 on 2019/04/23: + * + * 1. When a new GTFS+ feed is loaded in TDM, check as part of the loading and validation process if + * the dataset is for a future date. (If all services start in the future, consider the dataset + * to be for the future). + * 2. If it is a future dataset, automatically notify the user that the feed needs to be merged with + * most recent active version or a selected one in order to further process the feed. + * 3. Use the chosen version to merge the future feed. The merging process needs to be efficient so + * that the user doesn’t need to wait more than a tolerable time. + * 4. The merge process shall compare the current and future datasets, validate the following rules + * and generate the Merge Validation Report: + * i. Merging will be based on route_short_name in the current and future datasets. All matching + * route_short_names between the datasets shall be considered same route. Any route_short_name + * in active data not present in the future will be appended to the future routes file. + * ii. Future feed_info.txt file should get priority over active feed file when difference is + * identified. + * iii. When difference is found in agency.txt file between active and future feeds, the future + * agency.txt file data should be used. Possible issue with missing agency_id referenced by routes + * iv. When stop_code is included, stop merging will be based on that. If stop_code is not + * included, it will be based on stop_id. All stops in future data will be carried forward and + * any stops found in active data that are not in the future data shall be appended. If one + * of the feed is missing stop_code, merge fails with a notification to the user with + * suggestion that the feed with missing stop_code must be fixed with stop_code. + * v. If any service_id in the active feed matches with the future feed, it should be modified + * and all associated trip records must also be changed with the modified service_id. + * If a service_id from the active calendar has both the start_date and end_date in the + * future, the service shall not be appended to the merged file. Records in trips, + * calendar_dates, and calendar_attributes referencing this service_id shall also be + * removed/ignored. Stop_time records for the ignored trips shall also be removed. + * If a service_id from the active calendar has only the end_date in the future, the end_date + * shall be set to one day prior to the earliest start_date in future dataset before appending + * the calendar record to the merged file. + * trip_ids between active and future datasets must not match. If any trip_id is found to be + * matching, the merge should fail with appropriate notification to user with the cause of the + * failure. Notification should include all matched trip_ids. + * vi. New shape_ids in the future datasets should be appended in the merged feed. + * vii. Merging fare_attributes will be based on fare_id in the current and future datasets. All + * matching fare_ids between the datasets shall be considered same fare. Any fare_id in active + * data not present in the future will be appended to the future fare_attributes file. + * viii. All fare rules from the future dataset will be included. Any identical fare rules from + * the current dataset will be discarded. Any fare rules unique to the current dataset will be + * appended to the future file. + * ix. All transfers.txt entries with unique stop pairs (from - to) from both the future and + * current datasets will be included in the merged file. Entries with duplicate stop pairs from + * the current dataset will be discarded. + * x. All GTFS+ files should be merged based on how the associated base GTFS file is merged. For + * example, directions for routes that are not in the future routes.txt file should be appended + * to the future directions.txt file in the merged feed. + */ +public class MergeFeedsJob extends MonitorableJob { + + private static final Logger LOG = LoggerFactory.getLogger(MergeFeedsJob.class); + public static final ObjectMapper mapper = new ObjectMapper(); + private final Set feedVersions; + private final FeedSource feedSource; + private final ReferenceTracker referenceTracker = new ReferenceTracker(); + public MergeFeedsResult mergeFeedsResult; + private final String filename; + public final String projectId; + public final MergeFeedsType mergeType; + private File mergedTempFile = null; + final FeedVersion mergedVersion; + public boolean failOnDuplicateTripId = true; + + /** + * @param owner user ID that initiated job + * @param feedVersions set of feed versions to merge + * @param file resulting merge filename (without .zip) + * @param mergeType the type of merge to perform (@link MergeFeedsType) + */ + public MergeFeedsJob(String owner, Set feedVersions, String file, + MergeFeedsType mergeType) { + super(owner, mergeType.equals(REGIONAL) ? "Merging project feeds" : "Merging feed versions", + JobType.MERGE_FEED_VERSIONS); + this.feedVersions = feedVersions; + // Grab parent feed source if performing non-regional merge (each version should share the + // same feed source). + this.feedSource = + mergeType.equals(REGIONAL) ? null : feedVersions.iterator().next().parentFeedSource(); + // Construct full filename with extension + this.filename = String.format("%s.zip", file); + // If the merge type is regional, the file string should be equivalent to projectId, which + // is used by the client to download the merged feed upon job completion. + this.projectId = mergeType.equals(REGIONAL) ? file : null; + this.mergeType = mergeType; + // Assuming job is successful, mergedVersion will contain the resulting feed version. + this.mergedVersion = mergeType.equals(REGIONAL) ? null : new FeedVersion(this.feedSource); + this.mergeFeedsResult = new MergeFeedsResult(mergeType); + } + + /** + * The final stage handles clean up (deleting temp file) and adding the next job to process the + * new merged version (assuming the merge did not fail). + */ + public void jobFinished() { + // Delete temp file to ensure it does not cause storage bloat. Note: merged file has already been stored + // permanently. + if (!mergedTempFile.delete()) { + // FIXME: send to bugsnag? + LOG.error( + "Merged feed file {} not deleted. This may contribute to storage space shortages.", + mergedTempFile.getAbsolutePath()); + } + } + + /** + * Primary job logic handles collecting and sorting versions, creating a merged table for all versions, and writing + * the resulting zip file to storage. + */ + @Override public void jobLogic() throws IOException { + // Create temp zip file to add merged feed content to. + mergedTempFile = File.createTempFile(filename, null); + mergedTempFile.deleteOnExit(); + // Create the zipfile. + ZipOutputStream out = new ZipOutputStream(new FileOutputStream(mergedTempFile)); + LOG.info("Created project merge file: " + mergedTempFile.getAbsolutePath()); + List feedsToMerge = collectAndSortFeeds(feedVersions); + + // Determine which tables to merge (only merge GTFS+ tables for MTC extension). + final List tablesToMerge = + Arrays.stream(Table.tablesInOrder) + .filter(Table::isSpecTable) + .collect(Collectors.toList()); + if (DataManager.isExtensionEnabled("mtc")) { + // Merge GTFS+ tables only if MTC extension is enabled. We should do this for both + // regional and MTC merge strategies. + tablesToMerge.addAll(Arrays.asList(GtfsPlusTable.tables)); + } + int numberOfTables = tablesToMerge.size(); + // Loop over GTFS tables and merge each feed one table at a time. + for (int i = 0; i < numberOfTables; i++) { + Table table = tablesToMerge.get(i); + if (mergeType.equals(REGIONAL) && table.name.equals(Table.FEED_INFO.name)) { + // It does not make sense to include the feed_info table when performing a + // regional feed merge because this file is intended to contain data specific to + // a single agency feed. + // TODO: Perhaps future work can generate a special feed_info file for the merged + // file. + LOG.warn("Skipping feed_info table for regional merge."); + continue; + } + if (table.name.equals(Table.PATTERNS.name) || table.name.equals(Table.PATTERN_STOP.name)) { + LOG.warn("Skipping editor-only table {}.", table.name); + continue; + } + double percentComplete = Math.round((double) i / numberOfTables * 10000d) / 100d; + status.update("Merging " + table.name, percentComplete); + // Perform the merge. + LOG.info("Writing {} to merged feed", table.name); + int mergedLineNumber = constructMergedTable(table, feedsToMerge, out); + if (mergedLineNumber == 0) { + LOG.warn("Skipping {} table. No entries found in zip files.", table.name); + } else if (mergedLineNumber == -1) { + LOG.error("Merge {} table failed!", table.name); + } + } + // Close output stream for zip file. + out.close(); + // Handle writing file to storage (local or s3). + if (mergeFeedsResult.failed) { + status.fail("Merging feed versions failed."); + } else { + storeMergedFeed(); + status.update(false, "Merged feed created successfully.", 100, true); + } + LOG.info("Feed merge is complete."); + if (!mergeType.equals(REGIONAL) && !status.error && !mergeFeedsResult.failed) { + // Handle the processing of the new version for non-regional merges (note: s3 upload is handled within this job). + // We must add this job in jobLogic (rather than jobFinished) because jobFinished is called after this job's + // subJobs are run. + ProcessSingleFeedJob processSingleFeedJob = + new ProcessSingleFeedJob(mergedVersion, owner, true); + addNextJob(processSingleFeedJob); + } + } + + /** + * Collect zipFiles for each feed version before merging tables. + * Note: feed versions are sorted by first calendar date so that future dataset is iterated over first. This is + * required for the MTC merge strategy which prefers entities from the future dataset over past entities. + */ + private List collectAndSortFeeds(Set feedVersions) { + return feedVersions.stream().map(version -> { + try { + return new FeedToMerge(version); + } catch (Exception e) { + LOG.error("Could not create zip file for version {}:", version.parentFeedSource(), + version.version); + return null; + } + }).filter(Objects::nonNull).filter(entry -> entry.version.validationResult != null + && entry.version.validationResult.firstCalendarDate != null) + // MTC-specific sort mentioned in above comment. + // TODO: If another merge strategy requires a different sort order, a merge type check should be added. + .sorted(Comparator.comparing(entry -> entry.version.validationResult.firstCalendarDate, + Comparator.reverseOrder())).collect(Collectors.toList()); + } + + /** + * Handles writing the GTFS zip file to disk. For REGIONAL merges, this will end up in a project subdirectory on s3. + * Otherwise, it will write to a new version. + */ + private void storeMergedFeed() throws IOException { + if (mergeType.equals(REGIONAL)) { + status.update(false, "Saving merged feed.", 95); + // Store the project merged zip locally or on s3 + if (DataManager.useS3) { + String s3Key = String.join("/", "project", filename); + FeedStore.s3Client.putObject(DataManager.feedBucket, s3Key, mergedTempFile); + LOG.info("Storing merged project feed at s3://{}/{}", DataManager.feedBucket, + s3Key); + } else { + try { + FeedVersion.feedStore + .newFeed(filename, new FileInputStream(mergedTempFile), null); + } catch (IOException e) { + e.printStackTrace(); + LOG.error("Could not store feed for project {}", filename); + throw e; + } + } + } else { + // Store the zip file for the merged feed version. + try { + FeedVersion.feedStore + .newFeed(mergedVersion.id, new FileInputStream(mergedTempFile), feedSource); + } catch (IOException e) { + LOG.error("Could not store merged feed for new version"); + throw e; + } + } + } + + /** + * Merge the specified table for multiple GTFS feeds. + * + * @param table table to merge + * @param feedsToMerge map of feedSources to zipFiles from which to extract the .txt tables + * @param out output stream to write table into + * @return number of lines in merged table + */ + private int constructMergedTable(Table table, List feedsToMerge, + ZipOutputStream out) throws IOException { + // CSV writer used to write to zip file. + CsvListWriter writer = new CsvListWriter(new OutputStreamWriter(out), CsvPreference.STANDARD_PREFERENCE); + String keyField = table.getKeyFieldName(); + String orderField = table.getOrderFieldName(); + if (mergeType.equals(MTC)) { + // MTC requires that the stop and route records be merged based on different key fields. + switch (table.name) { + case "stops": + keyField = "stop_code"; + break; + case "routes": + keyField = "route_short_name"; + break; + default: + // Otherwise, use the standard key field (see keyField declaration. + break; + } + } + // Set up objects for tracking the rows encountered + Map rowValuesForStopOrRouteId = new HashMap<>(); + Set rowStrings = new HashSet<>(); + int mergedLineNumber = 0; + // Get the spec fields to export + List specFields = table.specFields(); + boolean stopCodeMissingFromFirstTable = false; + try { + // Iterate over each zip file. + for (int feedIndex = 0; feedIndex < feedsToMerge.size(); feedIndex++) { + boolean keyFieldMissing = false; + // Use for a new agency ID for use if the feed does not contain one. Initialize to + // null. If the value becomes non-null, the agency_id is missing and needs to be + // replaced with the generated value stored in this variable. + String newAgencyId = null; + mergeFeedsResult.feedCount++; + FeedToMerge feed = feedsToMerge.get(feedIndex); + FeedVersion version = feed.version; + FeedSource feedSource = version.parentFeedSource(); + // Generate ID prefix to scope GTFS identifiers to avoid conflicts. + String idScope = getCleanName(feedSource.name) + version.version; + CsvReader csvReader = table.getCsvReader(feed.zipFile, null); + // If csv reader is null, the table was not found in the zip file. There is no need + // to handle merging this table for the current zip file. + if (csvReader == null) { + LOG.warn("Table {} not found in the zip file for {}{}", table.name, + feedSource.name, version.version); + continue; + } + LOG.info("Adding {} table for {}{}", table.name, feedSource.name, version.version); + + Field[] fieldsFoundInZip = + table.getFieldsFromFieldHeaders(csvReader.getHeaders(), null); + List fieldsFoundList = Arrays.asList(fieldsFoundInZip); + // Determine the index of the key field for this version's table. + int keyFieldIndex = getFieldIndex(fieldsFoundInZip, keyField); + if (keyFieldIndex == -1) { + LOG.error("No {} field exists for {} table (feed={})", keyField, table.name, + feed.version.id); + keyFieldMissing = true; + // If there is no agency_id for agency table, create one and ensure that + // route#agency_id gets set. + } + int lineNumber = 0; + // Iterate over rows in table, writing them to the out file. + while (csvReader.readRecord()) { + String keyValue = csvReader.get(keyFieldIndex); + if (feedIndex > 0 && mergeType.equals(MTC)) { + // Always prefer the "future" file for the feed_info table, which means + // we can skip any iterations following the first one. If merging the agency + // table, we should only skip the following feeds if performing an MTC merge + // because that logic assumes the two feeds share the same agency (or + // agencies). NOTE: feed_info file is skipped by default (outside of this + // method) for a regional merge), which is why this block is exclusively + // for an MTC merge. Also, this statement may print multiple log + // statements, but it is deliberately nested in the csv while block in + // order to detect agency_id mismatches and fail the merge if found. + if (table.name.equals("feed_info")) { + LOG.warn("Skipping {} file for feed {}/{} (future file preferred)", + table.name, feedIndex, feedsToMerge.size()); + continue; + } else if (table.name.equals("agency")) { + // The second feed's agency table must contain the same agency_id + // value as the first feed. + String agencyId = String.join(":", keyField, keyValue); + if (!"".equals(keyValue) && !referenceTracker.transitIds.contains(agencyId)) { + String otherAgencyId = referenceTracker.transitIds.stream() + .filter(transitId -> transitId.startsWith("agency_id")) + .findAny() + .orElse(null); + String message = String.format( + "MTC merge detected mismatching agency_id values between two " + + "feeds (%s and %s). Failing merge operation.", + agencyId, + otherAgencyId + ); + LOG.error(message); + mergeFeedsResult.failed = true; + mergeFeedsResult.failureReasons.add(message); + return -1; + } + LOG.warn("Skipping {} file for feed {}/{} (future file preferred)", + table.name, feedIndex, feedsToMerge.size()); + continue; + } + } + // Check certain initial conditions on the first line of the file. + if (lineNumber == 0) { + if (table.name.equals(Table.AGENCY.name) && (keyFieldMissing || keyValue.equals(""))) { + // agency_id is optional if only one agency is present, but that will + // cause issues for the feed merge, so we need to insert an agency_id + // for the single entry. + newAgencyId = UUID.randomUUID().toString(); + if (keyFieldMissing) { + // Only add agency_id field if it is missing in table. + List fieldsList = new ArrayList<>(Arrays.asList(fieldsFoundInZip)); + fieldsList.add(Table.AGENCY.fields[0]); + fieldsFoundInZip = fieldsList.toArray(fieldsFoundInZip); + } + fieldsFoundList = Arrays.asList(fieldsFoundInZip); + } + if (mergeType.equals(MTC) && table.name.equals("stops")) { + // For the first line of the stops table, check that the alt. key + // field (stop_code) is present. If it is not, revert to the original + // key field. This is only pertinent for the MTC merge type. + // TODO: Use more sophisticated check for missing stop_codes than + // simply the first line containing the value. + if (feedIndex == 0) { + // Check that the first file contains stop_code values. + if ("".equals(keyValue)) { + LOG.warn( + "stop_code is not present in file {}/{}. Reverting to stop_id", + feedIndex, feedsToMerge.size()); + // If the key value for stop_code is not present, revert to stop_id. + keyField = table.getKeyFieldName(); + keyFieldIndex = table.getKeyFieldIndex(fieldsFoundInZip); + keyValue = csvReader.get(keyFieldIndex); + stopCodeMissingFromFirstTable = true; + } + } else { + // Check whether stop_code exists for the subsequent files. + String firstStopCodeValue = csvReader.get(getFieldIndex(fieldsFoundInZip, "stop_code")); + if (stopCodeMissingFromFirstTable && !"".equals(firstStopCodeValue)) { + // If stop_code was missing from the first file and exists for + // the second, we consider that a failing error. + mergeFeedsResult.failed = true; + mergeFeedsResult.errorCount++; + mergeFeedsResult.failureReasons.add( + "If one stops.txt file contains stop_codes, both feed versions must stop_codes."); + } + } + } + } + boolean skipRecord = false; + String[] rowValues = new String[specFields.size()]; + String[] values = csvReader.getValues(); + if (values.length == 1) { + LOG.warn("Found blank line. Skipping..."); + continue; + } + // Piece together the row to write, which should look practically identical to the original + // row except for the identifiers receiving a prefix to avoid ID conflicts. + for (int specFieldIndex = 0; + specFieldIndex < specFields.size(); specFieldIndex++) { + Field field = specFields.get(specFieldIndex); + // Get index of field from GTFS spec as it appears in feed + int index = fieldsFoundList.indexOf(field); + String val = csvReader.get(index); + // Default value to write is unchanged from value found in csv. + String valueToWrite = val; + // Handle filling in agency_id if missing when merging regional feeds. + if (newAgencyId != null && field.name.equals("agency_id") && mergeType + .equals(REGIONAL)) { + if (val.equals("") && table.name.equals("agency") && lineNumber > 0) { + // If there is no agency_id value for a second (or greater) agency + // record, fail the merge feed job. + String message = String.format( + "Feed %s has multiple agency records but no agency_id values.", + feed.version.id); + mergeFeedsResult.failed = true; + mergeFeedsResult.failureReasons.add(message); + LOG.error(message); + return -1; + } + LOG.info("Updating {}#agency_id to (auto-generated) {} for ID {}", + table.name, newAgencyId, keyValue); + val = newAgencyId; + } + // Determine if field is a GTFS identifier. + boolean isKeyField = + field.isForeignReference() || keyField.equals(field.name); + if (this.mergeType.equals(REGIONAL) && isKeyField && !val.isEmpty()) { + // For regional merge, if field is a GTFS identifier (e.g., route_id, + // stop_id, etc.), add scoped prefix. + valueToWrite = String.join(":", idScope, val); + } + // Only need to check for merge conflicts if using MTC merge type because + // the regional merge type scopes all identifiers by default. Also, the + // reference tracker will get far too large if we attempt to use it to + // track references for a large number of feeds (e.g., every feed in New + // York State). + if (mergeType.equals(MTC)) { + Set idErrors = referenceTracker + .checkReferencesAndUniqueness(keyValue, lineNumber, field, val, + table, keyField, orderField); + // Store values for key fields that have been encountered. + // TODO Consider using Strategy Pattern https://en.wikipedia.org/wiki/Strategy_pattern + // instead of a switch statement. + switch (table.name) { + case "calendar": + // If any service_id in the active feed matches with the future + // feed, it should be modified and all associated trip records + // must also be changed with the modified service_id. + // TODO How can we check that calendar_dates entries are + // duplicates? I think we would need to consider the + // service_id:exception_type:date as the unique key and include any + // all entries as long as they are unique on this key. + if (hasDuplicateError(idErrors)) { + String key = getTableScopedValue(table, idScope, val); + // Modify service_id and ensure that referencing trips + // have service_id updated. + valueToWrite = String.join(":", idScope, val); + mergeFeedsResult.remappedIds.put(key, valueToWrite); + } + // If a service_id from the active calendar has both the + // start_date and end_date in the future, the service will be + // excluded from the merged file. Records in trips, + // calendar_dates, and calendar_attributes referencing this + // service_id shall also be removed/ignored. Stop_time records + // for the ignored trips shall also be removed. + if (feedIndex > 0) { + int startDateIndex = + getFieldIndex(fieldsFoundInZip, "start_date"); + LocalDate startDate = LocalDate + .parse(csvReader.get(startDateIndex), + GTFS_DATE_FORMATTER); + if (startDate.isAfter(LocalDate.now())) { + LOG.warn( + "Skipping calendar entry {} because it operates in the future.", + keyValue); + String key = + getTableScopedValue(table, idScope, keyValue); + mergeFeedsResult.skippedIds.add(key); + skipRecord = true; + continue; + } + // If a service_id from the active calendar has only the + // end_date in the future, the end_date shall be set to one + // day prior to the earliest start_date in future dataset + // before appending the calendar record to the merged file. + int endDateIndex = + getFieldIndex(fieldsFoundInZip, "end_date"); + if (index == endDateIndex) { + LocalDate endDate = LocalDate + .parse(csvReader.get(endDateIndex), + GTFS_DATE_FORMATTER); + if (endDate.isAfter(LocalDate.now())) { + val = feedsToMerge.get( + 0).version.validationResult.firstCalendarDate + .minus(1, ChronoUnit.DAYS) + .format(GTFS_DATE_FORMATTER); + } + } + } + break; + case "trips": + // trip_ids between active and future datasets must not match. If any trip_id is found + // to be matching, the merge should fail with appropriate notification to user with the + // cause of the failure. Merge result should include all conflicting trip_ids. + for (NewGTFSError error : idErrors) { + if (error.errorType.equals(NewGTFSErrorType.DUPLICATE_ID)) { + mergeFeedsResult.failureReasons + .add("Trip ID conflict caused merge failure."); + mergeFeedsResult.idConflicts.add(error.badValue); + mergeFeedsResult.errorCount++; + if (failOnDuplicateTripId) + mergeFeedsResult.failed = true; + skipRecord = true; + } + } + break; + case "stops": + // When stop_code is included, stop merging will be based on that. If stop_code is not + // included, it will be based on stop_id. All stops in future data will be carried + // forward and any stops found in active data that are not in the future data shall be + // appended. If one of the feed is missing stop_code, merge fails with a notification to + // the user with suggestion that the feed with missing stop_code must be fixed with + // stop_code. + // NOTE: route case is also used by the stops case, so the route + // case must follow this block. + case "routes": + boolean useAltKey = + keyField.equals("stop_code") || keyField.equals("route_short_name"); + // First, check uniqueness of primary key value (i.e., stop or route ID) + // in case the stop_code or route_short_name are being used. This + // must occur unconditionally because each record must be tracked + // by the reference tracker. + String primaryKeyValue = + csvReader.get(table.getKeyFieldIndex(fieldsFoundInZip)); + Set primaryKeyErrors = referenceTracker + .checkReferencesAndUniqueness(primaryKeyValue, lineNumber, + field, val, table); + // Merging will be based on route_short_name/stop_code in the current and future datasets. All + // matching route_short_names/stop_codes between the datasets shall be considered same route/stop. Any + // route_short_name/stop_code in active data not present in the future will be appended to the + // future routes/stops file. + if (useAltKey) { + if ("".equals(val)) { + // If alt key is empty (which is permitted), skip + // checking of alt key dupe errors/re-mapping values and + // simply use the primary key (route_id/stop_id). + if (hasDuplicateError(primaryKeyErrors)) { + skipRecord = true; + } + } else if (hasDuplicateError(idErrors)) { + // If we encounter a route/stop that shares its alt. + // ID with a previous route/stop, we need to + // remap its route_id/stop_id field so that + // references point to the previous + // route_id/stop_id. For example, + // route_short_name in both feeds is "ABC" but + // each route has a different route_id (123 and + // 456). This block will map references to 456 to + // 123 so that ABC/123 is the route of record. + //////////////////////////////////////////////////////// + // Get current route/stop ID. (Note: primary + // ID index is always zero because we're + // iterating over the spec fields). + String currentPrimaryKey = rowValues[0]; + // Get unique key to check for remapped ID when + // writing values to file. + String key = + getTableScopedValue(table, idScope, currentPrimaryKey); + // Extract the route/stop ID value used for the + // route/stop with already encountered matching + // short name/stop code. + String[] strings = + rowValuesForStopOrRouteId.get(String.join( + ":", keyField, val)); + String keyForMatchingAltId = strings[0]; + if (!keyForMatchingAltId.equals(currentPrimaryKey)) { + // Remap this row's route_id/stop_id to ensure + // that referencing entities (trips, stop_times) + // have their references updated. + mergeFeedsResult.remappedIds.put(key, keyForMatchingAltId); + } + skipRecord = true; + } + // Next check for regular ID conflicts (e.g., on route_id or stop_id) because any + // conflicts here will actually break the feed. This essentially handles the case + // where two routes have different short_names, but share the same route_id. We want + // both of these routes to end up in the merged feed in this case because we're + // matching on short name, so we must modify the route_id. + if (!skipRecord && !referenceTracker.transitIds + .contains(String.join(":", keyField, keyValue))) { + if (hasDuplicateError(primaryKeyErrors)) { + String key = getTableScopedValue(table, idScope, val); + // Modify route_id and ensure that referencing trips + // have route_id updated. + valueToWrite = String.join(":", idScope, val); + mergeFeedsResult.remappedIds.put(key, valueToWrite); + } + } + } else { + // Key field has defaulted to the standard primary key field + // (stop_id or route_id), which makes the check much + // simpler (just skip the duplicate record). + if (hasDuplicateError(idErrors)) skipRecord = true; + } + + if (newAgencyId != null && field.name.equals("agency_id")) { + LOG.info( + "Updating route#agency_id to (auto-generated) {} for route={}", + newAgencyId, keyValue); + val = newAgencyId; + } + break; + default: + // For any other table, skip any duplicate record. + if (hasDuplicateError(idErrors)) skipRecord = true; + break; + } + } + + if (field.isForeignReference()) { + String key = getTableScopedValue(field.referenceTable, idScope, val); + // If the current foreign ref points to another record that has been skipped, skip this + // record and add its primary key to the list of skipped IDs (so that other references can + // be properly omitted). + if (mergeFeedsResult.skippedIds.contains(key)) { + String skippedKey = getTableScopedValue(table, idScope, keyValue); + if (orderField != null) { + skippedKey = String.join(":", skippedKey, + csvReader.get(getFieldIndex(fieldsFoundInZip, orderField))); + } + mergeFeedsResult.skippedIds.add(skippedKey); + skipRecord = true; + continue; + } + // If the field is a foreign reference, check to see whether the reference has been + // remapped due to a conflicting ID from another feed (e.g., calendar#service_id). + if (mergeFeedsResult.remappedIds.containsKey(key)) { + mergeFeedsResult.remappedReferences++; + // If the value has been remapped update the value to write. + valueToWrite = mergeFeedsResult.remappedIds.get(key); + } + } + rowValues[specFieldIndex] = valueToWrite; + } // End of iteration over each field for a row. + // Do not write rows that are designated to be skipped. + if (skipRecord && this.mergeType.equals(MTC)) { + mergeFeedsResult.recordsSkipCount++; + continue; + } + String newLine = String.join(",", rowValues); + switch (table.name) { + // Store row values for route or stop ID (or alternative ID field) in order + // to check for ID conflicts. NOTE: This is only intended to be used for + // routes and stops. Otherwise, this might (will) consume too much memory. + case "stops": + case "routes": + // FIXME: This should be revised for tables with order fields, but it should work fine for its + // primary purposes: to detect exact copy rows and to temporarily hold the data in case a reference + // needs to be looked up in order to remap an entity to that key. + // Here we need to get the key field index according to the spec + // table definition. Otherwise, if we use the keyFieldIndex variable + // defined above, we will be using the found fields index, which will + // cause major issues when trying to put and get values into the + // below map. + String key = String.join( + ":", keyField, rowValues[table.getFieldIndex(keyField)]); + rowValuesForStopOrRouteId.put(key, rowValues); + break; + case "transfers": + case "fare_rules": + if (!rowStrings.add(newLine)) { + // The line already exists in the output file, do not append it again. This prevents duplicate + // entries for certain files that do not contain primary keys (e.g., fare_rules and transfers) and + // do not otherwise have convenient ways to track uniqueness (like an order field). + // FIXME: add ordinal field/compound keys for transfers (from/to_stop_id) and fare_rules (?). + // Perhaps it makes sense to include all unique fare rules rows, but transfers that share the + // same from/to stop IDs but different transfer times or other values should not both be + // included in the merged feed (yet this strategy would fail to filter those out). + mergeFeedsResult.recordsSkipCount++; + continue; + } + break; + default: + // Do nothing. + break; + + } + // Finally, handle writing lines to zip entry. + if (mergedLineNumber == 0) { + // Create entry for zip file. + ZipEntry tableEntry = new ZipEntry(table.name + ".txt"); + out.putNextEntry(tableEntry); + // Write headers to table. + String[] headers = specFields.stream() + .map(field -> field.name) + .toArray(String[]::new); + writer.write(headers); + } + // Write line to table (plus new line char). + writer.write(rowValues); + lineNumber++; + mergedLineNumber++; + } // End of iteration over each row. + } + writer.flush(); + out.closeEntry(); + } catch (Exception e) { + LOG.error("Error merging feed sources: {}", + feedVersions.stream().map(version -> version.parentFeedSource().name) + .collect(Collectors.toList()).toString()); + e.printStackTrace(); + throw e; + } + // Track the number of lines in the merged table and return final number. + mergeFeedsResult.linesPerTable.put(table.name, mergedLineNumber); + return mergedLineNumber; + } + + /** Checks that any of a set of errors is of the type {@link NewGTFSErrorType#DUPLICATE_ID}. */ + private boolean hasDuplicateError(Set errors) { + for (NewGTFSError error : errors) { + if (error.errorType.equals(NewGTFSErrorType.DUPLICATE_ID)) return true; + } + return false; + } + + /** Get table-scoped value used for key when remapping references for a particular feed. */ + private static String getTableScopedValue(Table table, String prefix, String id) { + return String.join(":", + table.name, + prefix, + id); + } + + /** + * Helper class that collects the feed version and its zip file. Note: this class helps with sorting versions to + * merge in a list collection. + */ + private class FeedToMerge { + public FeedVersion version; + public ZipFile zipFile; + + FeedToMerge(FeedVersion version) throws IOException { + this.version = version; + this.zipFile = new ZipFile(version.retrieveGtfsFile()); + } + } +} diff --git a/src/main/java/com/conveyal/datatools/manager/jobs/MergeFeedsResult.java b/src/main/java/com/conveyal/datatools/manager/jobs/MergeFeedsResult.java new file mode 100644 index 000000000..e971e4fd2 --- /dev/null +++ b/src/main/java/com/conveyal/datatools/manager/jobs/MergeFeedsResult.java @@ -0,0 +1,40 @@ +package com.conveyal.datatools.manager.jobs; + +import java.io.Serializable; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Contains the result of {@link MergeFeedsJob}. + */ +public class MergeFeedsResult implements Serializable { + private static final long serialVersionUID = 1L; + + /** Number of feeds merged */ + public int feedCount; + public int errorCount; + /** Type of merge operation performed */ + public MergeFeedsType type; + /** Contains a set of strings for which there were error-causing duplicate values */ + public Set idConflicts = new HashSet<>(); + /** Contains the set of IDs for records that were excluded in the merged feed */ + public Set skippedIds = new HashSet<>(); + /** Contains the set of IDs that had their values remapped during the merge */ + public Map remappedIds = new HashMap<>(); + /** Mapping of table name to line count in merged file */ + public Map linesPerTable = new HashMap<>(); + public int remappedReferences; + public int recordsSkipCount; + public Date startTime; + public boolean failed; + /** Set of reasons explaining why merge operation failed */ + public Set failureReasons = new HashSet<>(); + + public MergeFeedsResult (MergeFeedsType type) { + this.type = type; + this.startTime = new Date(); + } +} diff --git a/src/main/java/com/conveyal/datatools/manager/jobs/MergeFeedsType.java b/src/main/java/com/conveyal/datatools/manager/jobs/MergeFeedsType.java new file mode 100644 index 000000000..f827c7f96 --- /dev/null +++ b/src/main/java/com/conveyal/datatools/manager/jobs/MergeFeedsType.java @@ -0,0 +1,6 @@ +package com.conveyal.datatools.manager.jobs; + +public enum MergeFeedsType { + REGIONAL, + MTC +} diff --git a/src/main/java/com/conveyal/datatools/manager/jobs/MergeProjectFeedsJob.java b/src/main/java/com/conveyal/datatools/manager/jobs/MergeProjectFeedsJob.java deleted file mode 100644 index 4cf329662..000000000 --- a/src/main/java/com/conveyal/datatools/manager/jobs/MergeProjectFeedsJob.java +++ /dev/null @@ -1,258 +0,0 @@ -package com.conveyal.datatools.manager.jobs; - -import com.conveyal.datatools.common.status.MonitorableJob; -import com.conveyal.datatools.common.utils.Consts; -import com.conveyal.datatools.manager.DataManager; -import com.conveyal.datatools.manager.models.FeedSource; -import com.conveyal.datatools.manager.models.FeedVersion; -import com.conveyal.datatools.manager.models.Project; -import com.conveyal.datatools.manager.persistence.FeedStore; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.zip.ZipEntry; -import java.util.zip.ZipFile; -import java.util.zip.ZipOutputStream; - -/** - * Created by landon on 9/19/17. - */ -public class MergeProjectFeedsJob extends MonitorableJob { - - private static final Logger LOG = LoggerFactory.getLogger(MergeProjectFeedsJob.class); - public final Project project; - - public MergeProjectFeedsJob(Project project, String owner) { - super(owner, "Merging project feeds for " + project.name, JobType.MERGE_PROJECT_FEEDS); - this.project = project; - status.message = "Merging feeds..."; - } - - @Override - public void jobLogic () throws IOException { - // get feed sources in project - Collection feeds = project.retrieveProjectFeedSources(); - - // create temp merged zip file to add feed content to - File mergedFile = null; - try { - mergedFile = File.createTempFile(project.id + "-merged", ".zip"); - mergedFile.deleteOnExit(); - } catch (IOException e) { - LOG.error("Could not create temp file"); - e.printStackTrace(); - throw e; - } - - // create the zipfile - ZipOutputStream out = new ZipOutputStream(new FileOutputStream(mergedFile)); - - LOG.info("Created project merge file: " + mergedFile.getAbsolutePath()); - - // map of feed versions to table entries contained within version's GTFS - Map feedSourceMap = new HashMap<>(); - - // collect zipFiles for each feedSource before merging tables - for (FeedSource fs : feeds) { - // check if feed source has version (use latest) - FeedVersion version = fs.retrieveLatest(); - if (version == null) { - LOG.info("Skipping {} because it has no feed versions", fs.name); - continue; - } - // modify feed version to use prepended feed id - LOG.info("Adding {} feed to merged zip", fs.name); - try { - File file = version.retrieveGtfsFile(); - if (file == null) { - LOG.error("No file exists for {}", version.id); - continue; - } - ZipFile zipFile = new ZipFile(file); - feedSourceMap.put(fs, zipFile); - } catch(Exception e) { - e.printStackTrace(); - LOG.error("Zipfile for version {} not found", version.id); - } - } - - // loop through GTFS tables - int numberOfTables = DataManager.gtfsConfig.size(); - for(int i = 0; i < numberOfTables; i++) { - JsonNode tableNode = DataManager.gtfsConfig.get(i); - byte[] tableOut = mergeTables(tableNode, feedSourceMap); - - // if at least one feed has the table, include it - if (tableOut != null) { - - String tableName = tableNode.get("name").asText(); - synchronized (status) { - status.message = "Merging " + tableName; - status.percentComplete = Math.round((double) i / numberOfTables * 10000d) / 100d; - } - // create entry for zip file - ZipEntry tableEntry = new ZipEntry(tableName); - try { - out.putNextEntry(tableEntry); - LOG.info("Writing {} to merged feed", tableName); - out.write(tableOut); - out.closeEntry(); - } catch (IOException e) { - LOG.error("Error writing to table {}", tableName); - e.printStackTrace(); - } - } - } - try { - out.close(); - } catch (IOException e) { - LOG.error("Error closing zip file"); - e.printStackTrace(); - } - synchronized (status) { - status.message = "Saving merged feed."; - status.percentComplete = 95.0; - } - // Store the project merged zip locally or on s3 - if (DataManager.useS3) { - String s3Key = "project/" + project.id + ".zip"; - FeedStore.s3Client.putObject(DataManager.feedBucket, s3Key, mergedFile); - LOG.info("Storing merged project feed at s3://{}/{}", DataManager.feedBucket, s3Key); - } else { - try { - FeedVersion.feedStore.newFeed(project.id + ".zip", new FileInputStream(mergedFile), null); - } catch (IOException e) { - LOG.error("Could not store feed for project {}", project.id); - e.printStackTrace(); - } - } - // delete temp file - mergedFile.delete(); - - synchronized (status) { - status.message = "Merged feed created successfully."; - status.completed = true; - status.percentComplete = 100.0; - } - } - - /** - * Merge the specified table for multiple GTFS feeds. - * @param tableNode tableNode to merge - * @param feedSourceMap map of feedSources to zipFiles from which to extract the .txt tables - * @return single merged table for feeds - */ - private static byte[] mergeTables(JsonNode tableNode, Map feedSourceMap) throws IOException { - - String tableName = tableNode.get("name").asText(); - ByteArrayOutputStream tableOut = new ByteArrayOutputStream(); - - ArrayNode fieldsNode = (ArrayNode) tableNode.get("fields"); - List headers = new ArrayList<>(); - for (int i = 0; i < fieldsNode.size(); i++) { - JsonNode fieldNode = fieldsNode.get(i); - String fieldName = fieldNode.get("name").asText(); - Boolean notInSpec = fieldNode.has("datatools") && fieldNode.get("datatools").asBoolean(); - if (notInSpec) { - fieldsNode.remove(i); - } - headers.add(fieldName); - } - - try { - // write headers to table - tableOut.write(String.join(",", headers).getBytes()); - tableOut.write("\n".getBytes()); - - // iterate over feed source to zipfile map - for ( Map.Entry mapEntry : feedSourceMap.entrySet()) { - FeedSource fs = mapEntry.getKey(); - ZipFile zipFile = mapEntry.getValue(); - final Enumeration entries = zipFile.entries(); - while (entries.hasMoreElements()) { - final ZipEntry entry = entries.nextElement(); - if(tableName.equals(entry.getName())) { - LOG.info("Adding {} table for {}", entry.getName(), fs.name); - - InputStream inputStream = zipFile.getInputStream(entry); - - BufferedReader in = new BufferedReader(new InputStreamReader(inputStream)); - String line = in.readLine(); - String[] fields = line.split(","); - - List fieldList = Arrays.asList(fields); - - - // iterate over rows in table - while((line = in.readLine()) != null) { - String[] newValues = new String[fieldsNode.size()]; - String[] values = line.split(Consts.COLUMN_SPLIT, -1); - if (values.length == 1) { - LOG.warn("Found blank line. Skipping..."); - continue; - } - for(int v = 0; v < fieldsNode.size(); v++) { - JsonNode fieldNode = fieldsNode.get(v); - String fieldName = fieldNode.get("name").asText(); - - // get index of field from GTFS spec as it appears in feed - int index = fieldList.indexOf(fieldName); - String val = ""; - try { - index = fieldList.indexOf(fieldName); - if(index != -1) { - val = values[index]; - } - } catch (ArrayIndexOutOfBoundsException e) { - LOG.warn("Index {} out of bounds for file {} and feed {}", index, entry.getName(), fs.name); - continue; - } - - String fieldType = fieldNode.get("inputType").asText(); - - // if field is a gtfs identifier, prepend with feed id/name - if (fieldType.contains("GTFS") && !val.isEmpty()) { - newValues[v] = fs.name + ":" + val; - } - else { - newValues[v] = val; - } - } - String newLine = String.join(",", newValues); - - // write line to table (plus new line char) - tableOut.write(newLine.getBytes()); - tableOut.write("\n".getBytes()); - } - } - } - } - } catch (IOException e) { - e.printStackTrace(); - LOG.error( - "Error merging feed sources: {}", - feedSourceMap.keySet().stream().map(fs -> fs.name).collect(Collectors.toList()).toString() - ); - throw e; - } - return tableOut.toByteArray(); - } -} diff --git a/src/main/java/com/conveyal/datatools/manager/jobs/ProcessSingleFeedJob.java b/src/main/java/com/conveyal/datatools/manager/jobs/ProcessSingleFeedJob.java index 4c1a11182..a5ed1a062 100644 --- a/src/main/java/com/conveyal/datatools/manager/jobs/ProcessSingleFeedJob.java +++ b/src/main/java/com/conveyal/datatools/manager/jobs/ProcessSingleFeedJob.java @@ -50,7 +50,7 @@ public String getFeedSourceId () { public void jobLogic () { LOG.info("Processing feed for {}", feedVersion.id); - // First, load the feed into database. + // First, load the feed into database. During this stage, the GTFS file will be uploaded to S3 (and deleted locally). addNextJob(new LoadFeedJob(feedVersion, owner, isNewVersion)); // Next, validate the feed. diff --git a/src/main/java/com/conveyal/datatools/manager/jobs/ValidateFeedJob.java b/src/main/java/com/conveyal/datatools/manager/jobs/ValidateFeedJob.java index 25d5091fd..a48876acd 100644 --- a/src/main/java/com/conveyal/datatools/manager/jobs/ValidateFeedJob.java +++ b/src/main/java/com/conveyal/datatools/manager/jobs/ValidateFeedJob.java @@ -4,6 +4,7 @@ import com.conveyal.datatools.common.utils.Scheduler; import com.conveyal.datatools.manager.models.FeedVersion; import com.conveyal.datatools.manager.persistence.Persistence; +import com.conveyal.gtfs.validator.ValidationResult; import com.fasterxml.jackson.annotation.JsonProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,4 +74,14 @@ public String getFeedSourceId () { return feedVersion.parentFeedSource().id; } + /** + * Getter that returns the validationResult so that once the job finishes, the client can optionally provide + * directions to users based on the success of the validation or other validation data (e.g., "The feed you have + * loaded is only valid for future dates."). + */ + @JsonProperty + public ValidationResult getValidationResult () { + return feedVersion.validationResult; + } + } diff --git a/src/main/java/com/conveyal/datatools/manager/models/Model.java b/src/main/java/com/conveyal/datatools/manager/models/Model.java index d9e073c60..14c5a1b7f 100644 --- a/src/main/java/com/conveyal/datatools/manager/models/Model.java +++ b/src/main/java/com/conveyal/datatools/manager/models/Model.java @@ -14,6 +14,8 @@ import java.util.UUID; import com.conveyal.datatools.manager.auth.Auth0UserProfile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.persistence.MappedSuperclass; @@ -25,6 +27,7 @@ @MappedSuperclass // applies mapping information to the subclassed entities FIXME remove? public abstract class Model implements Serializable { private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(Model.class); public Model () { // This autogenerates an ID @@ -96,7 +99,17 @@ public void storeUser(Auth0UserProfile profile) { public void storeUser(String id) { userId = id; if (!Auth0Connection.authDisabled()) { - Auth0UserProfile profile = Auth0Users.getUserById(userId); + Auth0UserProfile profile = null; + // Try to fetch Auth0 user to store email address. This is surrounded by a try/catch because in the event of + // a failure we do not want to cause issues from this low-level operation. + try { + profile = Auth0Users.getUserById(userId); + } catch (Exception e) { + LOG.warn( + "Could not find user profile {} from Auth0. This may be due to testing conditions or simply a bad user ID.", + id); + e.printStackTrace(); + } userEmail = profile != null ? profile.getEmail() : null; } else { userEmail = "no_auth@conveyal.com"; diff --git a/src/main/resources/gtfs/gtfs.yml b/src/main/resources/gtfs/gtfs.yml index 5d86bca60..acc45edf0 100644 --- a/src/main/resources/gtfs/gtfs.yml +++ b/src/main/resources/gtfs/gtfs.yml @@ -318,6 +318,27 @@ columnWidth: 12 helpContent: +- id: shape + name: shapes.txt + helpContent: Shapes describe the physical path that a vehicle takes, and are defined in the file shapes.txt. Shapes belong to Trips, and consist of a sequence of points. Tracing the points in order provides the path of the vehicle. The points do not need to match stop locations. + fields: + - name: shape_id + required: true + inputType: GTFS_ID + helpContent: The shape_id field contains an ID that uniquely identifies a shape. + - name: shape_pt_lat + required: true + inputType: LATITUDE + - name: shape_pt_lon + required: true + inputType: LONGITUDE + - name: shape_pt_sequence + required: true + inputType: POSITIVE_INT + - name: shape_dist_traveled + inputType: POSITIVE_NUM + required: false + - id: trip name: trips.txt helpContent: Trips for each route. A trip is a sequence of two or more stops that occurs at specific time. diff --git a/src/test/java/com/conveyal/datatools/DatatoolsTest.java b/src/test/java/com/conveyal/datatools/DatatoolsTest.java index 19e306dc5..a62475e2c 100644 --- a/src/test/java/com/conveyal/datatools/DatatoolsTest.java +++ b/src/test/java/com/conveyal/datatools/DatatoolsTest.java @@ -6,6 +6,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; /** * Created by landon on 2/24/17. diff --git a/src/test/java/com/conveyal/datatools/TestUtils.java b/src/test/java/com/conveyal/datatools/TestUtils.java index 3ae664478..b340dcf94 100644 --- a/src/test/java/com/conveyal/datatools/TestUtils.java +++ b/src/test/java/com/conveyal/datatools/TestUtils.java @@ -2,10 +2,23 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.stream.Collectors; + +import static com.conveyal.datatools.manager.DataManager.GTFS_DATA_SOURCE; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; public class TestUtils { + + private static final Logger LOG = LoggerFactory.getLogger(TestUtils.class); + /** * Parse a json string into an unmapped JsonNode object */ @@ -13,4 +26,26 @@ public static JsonNode parseJson(String jsonString) throws IOException { ObjectMapper mapper = new ObjectMapper(); return mapper.readTree(jsonString); } + + public static void assertThatSqlQueryYieldsRowCount(String sql, int expectedRowCount) throws + SQLException { + LOG.info(sql); + int recordCount = 0; + ResultSet rs = GTFS_DATA_SOURCE.getConnection().prepareStatement(sql).executeQuery(); + while (rs.next()) recordCount++; + assertThat("Records matching query should equal expected count.", recordCount, equalTo(expectedRowCount)); + } + + public static void assertThatFeedHasNoErrorsOfType (String namespace, String... errorTypes) throws SQLException { + assertThatSqlQueryYieldsRowCount( + String.format( + "select * from %s.errors where error_type in (%s)", + namespace, + Arrays.stream(errorTypes) + .map(error -> String.format("'%s'", error)) + .collect(Collectors.joining(",")) + ), + 0 + ); + } } diff --git a/src/test/java/com/conveyal/datatools/manager/jobs/MergeFeedsJobTest.java b/src/test/java/com/conveyal/datatools/manager/jobs/MergeFeedsJobTest.java new file mode 100644 index 000000000..722d1ebb2 --- /dev/null +++ b/src/test/java/com/conveyal/datatools/manager/jobs/MergeFeedsJobTest.java @@ -0,0 +1,210 @@ +package com.conveyal.datatools.manager.jobs; + +import com.conveyal.datatools.DatatoolsTest; +import com.conveyal.datatools.LoadFeedTest; +import com.conveyal.datatools.TestUtils; +import com.conveyal.datatools.manager.models.FeedSource; +import com.conveyal.datatools.manager.models.FeedVersion; +import com.conveyal.datatools.manager.models.Project; +import com.conveyal.datatools.manager.persistence.Persistence; +import com.conveyal.gtfs.error.NewGTFSErrorType; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.sql.SQLException; +import java.util.Date; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the various {@link MergeFeedsJob} merge types. + */ +public class MergeFeedsJobTest { + private static final Logger LOG = LoggerFactory.getLogger(MergeFeedsJobTest.class); + private static FeedVersion bartVersion1; + private static FeedVersion bartVersion2; + private static FeedVersion calTrainVersion; + private static Project project; + private static FeedVersion napaVersion; + + /** + * Prepare and start a testing-specific web server + */ + @BeforeClass + public static void setUp() { + // start server if it isn't already running + DatatoolsTest.setUp(); + // Create a project, feed sources, and feed versions to merge. + project = new Project(); + project.name = String.format("Test %s", new Date().toString()); + Persistence.projects.create(project); + FeedSource bart = new FeedSource("BART"); + bart.projectId = project.id; + Persistence.feedSources.create(bart); + bartVersion1 = createFeedVersion(bart, "bart_old.zip"); + bartVersion2 = createFeedVersion(bart, "bart_new.zip"); + FeedSource caltrain = new FeedSource("Caltrain"); + caltrain.projectId = project.id; + Persistence.feedSources.create(caltrain); + calTrainVersion = createFeedVersion(caltrain, "caltrain_gtfs.zip"); + FeedSource napa = new FeedSource("Napa"); + napa.projectId = project.id; + Persistence.feedSources.create(napa); + napaVersion = createFeedVersion(napa, "napa-no-agency-id.zip"); + } + + /** + * Ensures that a regional feed merge will produce a feed that includes all entities from each feed. + */ + @Test + public void canMergeRegional() throws SQLException { + // Set up list of feed versions to merge. + Set versions = new HashSet<>(); + versions.add(bartVersion1); + versions.add(calTrainVersion); + versions.add(napaVersion); + MergeFeedsJob mergeFeedsJob = new MergeFeedsJob("test", versions, project.id, MergeFeedsType.REGIONAL); + // Run the job in this thread (we're not concerned about concurrency here). + mergeFeedsJob.run(); + // Create a new feed source/version for the merged feed, so we can easily analyze its contents. + FeedSource source = new FeedSource("Merged feed"); + source.projectId = project.id; + Persistence.feedSources.create(source); + File feed = FeedVersion.feedStore.getFeed(project.id + ".zip"); + LOG.info("Regional merged file: {}", feed.getAbsolutePath()); + FeedVersion mergedVersion = createFeedVersion(source, feed); + // Ensure the feed has the row counts we expect. + assertEquals( + "trips count for merged feed should equal sum of trips for versions merged.", + bartVersion1.feedLoadResult.trips.rowCount + calTrainVersion.feedLoadResult.trips.rowCount + napaVersion.feedLoadResult.trips.rowCount, + mergedVersion.feedLoadResult.trips.rowCount + ); + assertEquals( + "routes count for merged feed should equal sum of routes for versions merged.", + bartVersion1.feedLoadResult.routes.rowCount + calTrainVersion.feedLoadResult.routes.rowCount + napaVersion.feedLoadResult.routes.rowCount, + mergedVersion.feedLoadResult.routes.rowCount + ); + assertEquals( + "stops count for merged feed should equal sum of stops for versions merged.", + mergedVersion.feedLoadResult.stops.rowCount, + bartVersion1.feedLoadResult.stops.rowCount + calTrainVersion.feedLoadResult.stops.rowCount + napaVersion.feedLoadResult.stops.rowCount + ); + assertEquals( + "agency count for merged feed should equal sum of agency for versions merged.", + mergedVersion.feedLoadResult.agency.rowCount, + bartVersion1.feedLoadResult.agency.rowCount + calTrainVersion.feedLoadResult.agency.rowCount + napaVersion.feedLoadResult.agency.rowCount + ); + assertEquals( + "stopTimes count for merged feed should equal sum of stopTimes for versions merged.", + mergedVersion.feedLoadResult.stopTimes.rowCount, + bartVersion1.feedLoadResult.stopTimes.rowCount + calTrainVersion.feedLoadResult.stopTimes.rowCount + napaVersion.feedLoadResult.stopTimes.rowCount + ); + assertEquals( + "calendar count for merged feed should equal sum of calendar for versions merged.", + mergedVersion.feedLoadResult.calendar.rowCount, + bartVersion1.feedLoadResult.calendar.rowCount + calTrainVersion.feedLoadResult.calendar.rowCount + napaVersion.feedLoadResult.calendar.rowCount + ); + assertEquals( + "calendarDates count for merged feed should equal sum of calendarDates for versions merged.", + mergedVersion.feedLoadResult.calendarDates.rowCount, + bartVersion1.feedLoadResult.calendarDates.rowCount + calTrainVersion.feedLoadResult.calendarDates.rowCount + napaVersion.feedLoadResult.calendarDates.rowCount + ); + // Ensure there are no referential integrity errors, duplicate ID, or wrong number of + // fields errors. + TestUtils.assertThatFeedHasNoErrorsOfType( + mergedVersion.namespace, + NewGTFSErrorType.REFERENTIAL_INTEGRITY.toString(), + NewGTFSErrorType.DUPLICATE_ID.toString(), + NewGTFSErrorType.WRONG_NUMBER_OF_FIELDS.toString() + ); + } + + /** + * Ensures that an MTC merge of feeds with duplicate trip IDs will fail. + */ + @Test + public void mergeMTCShouldFailOnDuplicateTrip() { + Set versions = new HashSet<>(); + versions.add(bartVersion1); + versions.add(bartVersion2); + MergeFeedsJob mergeFeedsJob = new MergeFeedsJob("test", versions, "merged_output", MergeFeedsType.MTC); + // Run the job in this thread (we're not concerned about concurrency here). + mergeFeedsJob.run(); + // Result should fail. + assertEquals( + "Merge feeds job should fail due to duplicate trip IDs.", + true, + mergeFeedsJob.mergeFeedsResult.failed + ); + } + + /** + * Tests that the MTC merge strategy will successfully merge BART feeds. Note: this test turns off + * {@link MergeFeedsJob#failOnDuplicateTripId} in order to force the merge to succeed even though there are duplicate + * trips contained within. + */ + @Test + public void canMergeBARTFeeds() throws SQLException { + Set versions = new HashSet<>(); + versions.add(bartVersion1); + versions.add(bartVersion2); + MergeFeedsJob mergeFeedsJob = new MergeFeedsJob("test", versions, "merged_output", MergeFeedsType.MTC); + // This time, turn off the failOnDuplicateTripId flag. + mergeFeedsJob.failOnDuplicateTripId = false; + mergeFeedsJob.run(); + // Result should succeed this time. + assertEquals( + "Merged feed trip count should equal expected value.", + 4552, // Magic number represents the number of trips in the merged BART feed. + mergeFeedsJob.mergedVersion.feedLoadResult.trips.rowCount + ); + assertEquals( + "Merged feed route count should equal expected value.", + 9, // Magic number represents the number of routes in the merged BART feed. + mergeFeedsJob.mergedVersion.feedLoadResult.routes.rowCount + ); + // Ensure there are no referential integrity errors or duplicate ID errors. + TestUtils.assertThatFeedHasNoErrorsOfType( + mergeFeedsJob.mergedVersion.namespace, + NewGTFSErrorType.REFERENTIAL_INTEGRITY.toString(), + NewGTFSErrorType.DUPLICATE_ID.toString() + ); + } + + /** + * Utility function to create a feed version during tests. Note: this is intended to run the job in the same thread, + * so that tasks can run synchronously. + */ + public static FeedVersion createFeedVersion(FeedSource source, String gtfsFileName) { + File gtfsFile = new File(LoadFeedTest.class.getResource(gtfsFileName).getFile()); + return createFeedVersion(source, gtfsFile); + } + + /** + * Utility function to create a feed version during tests. Note: this is intended to run the job in the same thread, + * so that tasks can run synchronously. + */ + public static FeedVersion createFeedVersion(FeedSource source, File gtfsFile) { + FeedVersion version = new FeedVersion(source); + InputStream is; + try { + is = new FileInputStream(gtfsFile); + version.newGtfsFile(is); + } catch (IOException e) { + e.printStackTrace(); + } + ProcessSingleFeedJob processSingleFeedJob = new ProcessSingleFeedJob(version, "test", true); + // Run in same thread. + processSingleFeedJob.run(); + return version; + } + +} diff --git a/src/test/java/com/conveyal/datatools/manager/persistence/PersistenceTest.java b/src/test/java/com/conveyal/datatools/manager/persistence/PersistenceTest.java index 846fe77bb..143aa261d 100644 --- a/src/test/java/com/conveyal/datatools/manager/persistence/PersistenceTest.java +++ b/src/test/java/com/conveyal/datatools/manager/persistence/PersistenceTest.java @@ -30,7 +30,7 @@ public void createFeedSource() { String id = feedSource.id; Persistence.feedSources.create(feedSource); String retrievedId = Persistence.feedSources.getById(id).id; - assertEquals("Found FeedSource ID should equal inserted ID.", retrievedId, id); + assertEquals("Found FeedSource ID should equal inserted ID.", id, retrievedId); } // @Test @@ -59,7 +59,7 @@ public void createProject() { String id = project.id; Persistence.projects.create(project); String retrievedId = Persistence.projects.getById(id).id; - assertEquals("Found Project ID should equal inserted ID.", retrievedId, id); + assertEquals("Found Project ID should equal inserted ID.", id, retrievedId); } // // @Test diff --git a/src/test/resources/com/conveyal/datatools/bart_new.zip b/src/test/resources/com/conveyal/datatools/bart_new.zip new file mode 100644 index 000000000..917d60d91 Binary files /dev/null and b/src/test/resources/com/conveyal/datatools/bart_new.zip differ diff --git a/src/test/resources/com/conveyal/datatools/bart_old.zip b/src/test/resources/com/conveyal/datatools/bart_old.zip new file mode 100644 index 000000000..6f4af8fcd Binary files /dev/null and b/src/test/resources/com/conveyal/datatools/bart_old.zip differ diff --git a/src/test/resources/com/conveyal/datatools/napa-no-agency-id.zip b/src/test/resources/com/conveyal/datatools/napa-no-agency-id.zip new file mode 100644 index 000000000..bfb38c737 Binary files /dev/null and b/src/test/resources/com/conveyal/datatools/napa-no-agency-id.zip differ