From 1ae3d70664a12f9e23e901a4ab20bc497d02c105 Mon Sep 17 00:00:00 2001 From: Peter Monks Date: Tue, 6 Oct 2015 20:36:37 -0700 Subject: [PATCH] :bug: Fix issues 46, 47, & 48 --- .../extension/bulkimport/stop.post.desc.xml | 1 + .../bulkimport/stop.post.json.202.ftl | 4 ++ .../alfresco-global.properties | 10 ++-- .../context/core-context.xml | 9 ++- .../context/filesystem-source-context.xml | 7 ++- .../amp/web/scripts/bulkimport/bulkimport.js | 1 - .../bulkimport/impl/BatchImporterImpl.java | 21 ++++--- .../bulkimport/impl/BulkImportStatusImpl.java | 4 +- .../impl/BulkImportThreadFactory.java | 3 + .../impl/BulkImportThreadPoolExecutor.java | 35 +++++------ .../bulkimport/impl/BulkImporterImpl.java | 27 ++++++--- .../extension/bulkimport/impl/Scanner.java | 58 +++++++++---------- .../source/fs/DirectoryAnalyser.java | 10 ++++ .../source/fs/FilesystemBulkImportSource.java | 17 ++++-- .../extension/bulkimport/util/LogUtils.java | 34 ++++++++++- .../extension/bulkimport/util/Utils.java | 15 ++--- .../webscripts/BulkImportStopWebScript.java | 14 ++--- .../bulkimport/BulkImportCallback.java | 1 + .../source/AbstractBulkImportSource.java | 12 ++-- .../module-context.xml | 6 +- .../source/sample/SampleSource.java | 4 +- .../make-large-folder-structure.clj | 2 +- 22 files changed, 180 insertions(+), 115 deletions(-) create mode 100644 amp/src/main/amp/config/alfresco/extension/templates/webscripts/org/alfresco/extension/bulkimport/stop.post.json.202.ftl diff --git a/amp/src/main/amp/config/alfresco/extension/templates/webscripts/org/alfresco/extension/bulkimport/stop.post.desc.xml b/amp/src/main/amp/config/alfresco/extension/templates/webscripts/org/alfresco/extension/bulkimport/stop.post.desc.xml index 6dd2c11..9a89345 100644 --- a/amp/src/main/amp/config/alfresco/extension/templates/webscripts/org/alfresco/extension/bulkimport/stop.post.desc.xml +++ b/amp/src/main/amp/config/alfresco/extension/templates/webscripts/org/alfresco/extension/bulkimport/stop.post.desc.xml @@ -2,6 +2,7 @@ Bulk Import - Stop Web Script that provides a way to request that a bulk import be stopped. /bulk/import/stop + admin Bulk Import diff --git a/amp/src/main/amp/config/alfresco/extension/templates/webscripts/org/alfresco/extension/bulkimport/stop.post.json.202.ftl b/amp/src/main/amp/config/alfresco/extension/templates/webscripts/org/alfresco/extension/bulkimport/stop.post.json.202.ftl new file mode 100644 index 0000000..0d8b597 --- /dev/null +++ b/amp/src/main/amp/config/alfresco/extension/templates/webscripts/org/alfresco/extension/bulkimport/stop.post.json.202.ftl @@ -0,0 +1,4 @@ +[#ftl] +{ + "result" : "${result}" +} diff --git a/amp/src/main/amp/config/alfresco/module/org.alfresco.extension.alfresco-bulk-import/alfresco-global.properties b/amp/src/main/amp/config/alfresco/module/org.alfresco.extension.alfresco-bulk-import/alfresco-global.properties index 84784b8..6b9c750 100644 --- a/amp/src/main/amp/config/alfresco/module/org.alfresco.extension.alfresco-bulk-import/alfresco-global.properties +++ b/amp/src/main/amp/config/alfresco/module/org.alfresco.extension.alfresco-bulk-import/alfresco-global.properties @@ -10,10 +10,8 @@ # content file size. alfresco-bulk-import.batch.weight=100 -# The size of the thread pool during the folder import phase (must be > 0) -alfresco-bulk-import.folder.threadpool.size=2 -# The size of the thread pool during the file import phase (<= 0 means autosize -# based on the number of CPU cores in the server) +# The size of the thread pool (during the file import phase only) +# <= 0 means autosize based on the number of CPU cores in the server alfresco-bulk-import.file.threadpool.size=-1 # The maximum size (number of batches) allowed in the queue, before scanning @@ -21,8 +19,8 @@ alfresco-bulk-import.file.threadpool.size=-1 alfresco-bulk-import.batch.queue.size=100 # How long to keep inactive threads alive -alfresco-bulk-import.threadpool.keepAlive.time=30 -alfresco-bulk-import.threadpool.keepAlive.units=SECONDS +alfresco-bulk-import.threadpool.keepAlive.time=10 +alfresco-bulk-import.threadpool.keepAlive.units=MINUTES ############################################################################### diff --git a/amp/src/main/amp/config/alfresco/module/org.alfresco.extension.alfresco-bulk-import/context/core-context.xml b/amp/src/main/amp/config/alfresco/module/org.alfresco.extension.alfresco-bulk-import/context/core-context.xml index 34331e5..be3362e 100644 --- a/amp/src/main/amp/config/alfresco/module/org.alfresco.extension.alfresco-bulk-import/context/core-context.xml +++ b/amp/src/main/amp/config/alfresco/module/org.alfresco.extension.alfresco-bulk-import/context/core-context.xml @@ -21,11 +21,10 @@ - - - - - + + + + - - - + + + + diff --git a/amp/src/main/amp/web/scripts/bulkimport/bulkimport.js b/amp/src/main/amp/web/scripts/bulkimport/bulkimport.js index b2556ad..0babd62 100644 --- a/amp/src/main/amp/web/scripts/bulkimport/bulkimport.js +++ b/amp/src/main/amp/web/scripts/bulkimport/bulkimport.js @@ -21,7 +21,6 @@ * This file contains the browser functionality used by the HTML status page. */ -//####TODO: CHANGE THIS PRIOR TO RELEASE! var logLevel = log.levels.DEBUG; // See http://pimterry.github.io/loglevel/ for details // Global variables diff --git a/amp/src/main/java/org/alfresco/extension/bulkimport/impl/BatchImporterImpl.java b/amp/src/main/java/org/alfresco/extension/bulkimport/impl/BatchImporterImpl.java index 1069d62..c5ef9fe 100644 --- a/amp/src/main/java/org/alfresco/extension/bulkimport/impl/BatchImporterImpl.java +++ b/amp/src/main/java/org/alfresco/extension/bulkimport/impl/BatchImporterImpl.java @@ -180,7 +180,7 @@ private final void importBatchImpl(final NodeRef target, { for (final BulkImportItem item : batch) { - if (Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early."); + if (importStatus.isStopping() || Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early."); importItem(target, item, replaceExisting, dryRun); } @@ -188,10 +188,10 @@ private final void importBatchImpl(final NodeRef target, } - private final void importItem(final NodeRef target, + private final void importItem(final NodeRef target, final BulkImportItem item, - final boolean replaceExisting, - final boolean dryRun) + final boolean replaceExisting, + final boolean dryRun) throws InterruptedException { try @@ -216,9 +216,13 @@ private final void importItem(final NodeRef target, if (trace(log)) trace(log, "Finished importing " + String.valueOf(item)); } + catch (final InterruptedException ie) + { + Thread.currentThread().interrupt(); + throw ie; + } catch (final OutOfOrderBatchException oobe) { - // Fix issue #40 - https://github.com/pmonks/alfresco-bulk-import/issues/40 throw oobe; } catch (final Exception e) @@ -403,7 +407,7 @@ else if (numberOfVersions == 1) for (final BulkImportItemVersion version : item.getVersions()) { - if (Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early."); + if (importStatus.isStopping() || Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early."); importVersion(nodeRef, previousVersion, version, dryRun, false); previousVersion = version; @@ -508,7 +512,7 @@ private final void importVersionMetadata(final NodeRef nodeRef, { for (final String aspect : aspects) { - if (Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early."); + if (importStatus.isStopping() || Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early."); if (dryRun) { @@ -532,7 +536,7 @@ private final void importVersionMetadata(final NodeRef nodeRef, for (final String key : metadata.keySet()) { - if (Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early."); + if (importStatus.isStopping() || Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early."); QName keyQName = createQName(serviceRegistry, key); Serializable value = metadata.get(key); @@ -626,5 +630,4 @@ private final void importVersionContent(final NodeRef nodeRef, } } - } diff --git a/amp/src/main/java/org/alfresco/extension/bulkimport/impl/BulkImportStatusImpl.java b/amp/src/main/java/org/alfresco/extension/bulkimport/impl/BulkImportStatusImpl.java index 1e685a2..e175375 100644 --- a/amp/src/main/java/org/alfresco/extension/bulkimport/impl/BulkImportStatusImpl.java +++ b/amp/src/main/java/org/alfresco/extension/bulkimport/impl/BulkImportStatusImpl.java @@ -262,11 +262,11 @@ public void importComplete() this.endDate = new Date(); this.threadPool = null; - if (ProcessingState.STOPPING.equals(this.state)) + if (isStopping()) { this.state = ProcessingState.STOPPED; } - else if (this.lastException != null) + else if (getLastException() != null) { this.state = ProcessingState.FAILED; } diff --git a/amp/src/main/java/org/alfresco/extension/bulkimport/impl/BulkImportThreadFactory.java b/amp/src/main/java/org/alfresco/extension/bulkimport/impl/BulkImportThreadFactory.java index a903cb2..6c576ce 100644 --- a/amp/src/main/java/org/alfresco/extension/bulkimport/impl/BulkImportThreadFactory.java +++ b/amp/src/main/java/org/alfresco/extension/bulkimport/impl/BulkImportThreadFactory.java @@ -71,6 +71,9 @@ public Thread newThread(final Runnable runnable) } + /** + * Resets this thread pool (i.e. sets the thread number counter back to zero). + */ public void reset() { currentThreadNumber.set(0); diff --git a/amp/src/main/java/org/alfresco/extension/bulkimport/impl/BulkImportThreadPoolExecutor.java b/amp/src/main/java/org/alfresco/extension/bulkimport/impl/BulkImportThreadPoolExecutor.java index 4aa5a6a..5e79fd1 100644 --- a/amp/src/main/java/org/alfresco/extension/bulkimport/impl/BulkImportThreadPoolExecutor.java +++ b/amp/src/main/java/org/alfresco/extension/bulkimport/impl/BulkImportThreadPoolExecutor.java @@ -42,33 +42,30 @@ public class BulkImportThreadPoolExecutor { private final static Log log = LogFactory.getLog(BulkImportThreadPoolExecutor.class); - private final static int DEFAULT_FOLDER_THREAD_POOL_SIZE = 2; - private final static int DEFAULT_FILE_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2; // We naively assume 50% of time is spent blocked on I/O - private final static long DEFAULT_KEEP_ALIVE_TIME = 1L; - private final static TimeUnit DEFAULT_KEEP_ALIVE_TIME_UNIT = TimeUnit.MINUTES; - private final static int DEFAULT_QUEUE_SIZE = 10000; + private final static int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2; // We naively assume 50% of time is spent blocked on I/O + private final static long DEFAULT_KEEP_ALIVE_TIME = 10L; + private final static TimeUnit DEFAULT_KEEP_ALIVE_TIME_UNIT = TimeUnit.MINUTES; + private final static int DEFAULT_QUEUE_SIZE = 100; - public BulkImportThreadPoolExecutor(final int folderThreadPoolSize, - final int fileThreadPoolSize, + public BulkImportThreadPoolExecutor(final int threadPoolSize, final int queueSize, final long keepAliveTime, final TimeUnit keepAliveTimeUnit) { - super(folderThreadPoolSize <= 0 ? DEFAULT_FOLDER_THREAD_POOL_SIZE : folderThreadPoolSize, // Core pool size - fileThreadPoolSize <= 0 ? DEFAULT_FILE_THREAD_POOL_SIZE : fileThreadPoolSize, // Max pool size (same as core pool size) - keepAliveTime <= 0 ? DEFAULT_KEEP_ALIVE_TIME : keepAliveTime, // Keep alive - keepAliveTimeUnit == null ? DEFAULT_KEEP_ALIVE_TIME_UNIT : keepAliveTimeUnit, // Keep alive units - new ArrayBlockingQueue(queueSize <= (BulkImporterImpl.DEFAULT_BATCH_WEIGHT * 2) ? DEFAULT_QUEUE_SIZE : queueSize, true), // Queue, with fairness enabled (to get true FIFO, thereby minimising out-of-order retries) - new BulkImportThreadFactory(), // Thread factory - new ThreadPoolExecutor.AbortPolicy()); // Rejection handler + super(threadPoolSize <= 0 ? DEFAULT_THREAD_POOL_SIZE : threadPoolSize, // Core pool size + threadPoolSize <= 0 ? DEFAULT_THREAD_POOL_SIZE : threadPoolSize, // Max pool size (same as core pool size) + keepAliveTime <= 0 ? DEFAULT_KEEP_ALIVE_TIME : keepAliveTime, // Keep alive + keepAliveTimeUnit == null ? DEFAULT_KEEP_ALIVE_TIME_UNIT : keepAliveTimeUnit, // Keep alive units + new ArrayBlockingQueue((queueSize <= 0 ? DEFAULT_QUEUE_SIZE : queueSize), true), // Queue, with fairness enabled (to get true FIFO, thereby minimising out-of-order retries) + new BulkImportThreadFactory(), // Thread factory + new ThreadPoolExecutor.AbortPolicy()); // Rejection handler if (debug(log)) debug(log, "Creating new bulk import thread pool." + - " Folder thread Pool Size=" + (folderThreadPoolSize <= 0 ? DEFAULT_FOLDER_THREAD_POOL_SIZE : folderThreadPoolSize) + - " Thread Pool Size=" + (fileThreadPoolSize <= 0 ? DEFAULT_FILE_THREAD_POOL_SIZE : fileThreadPoolSize) + - ", Queue Size=" + (queueSize <= 0 ? DEFAULT_QUEUE_SIZE : queueSize) + - ", Keep Alive Time=" + (keepAliveTime <= 0 ? DEFAULT_KEEP_ALIVE_TIME : keepAliveTime) + - " " + String.valueOf(keepAliveTimeUnit == null ? DEFAULT_KEEP_ALIVE_TIME_UNIT : keepAliveTimeUnit)); + " Thread Pool Size=" + (threadPoolSize <= 0 ? DEFAULT_THREAD_POOL_SIZE : threadPoolSize) + + ", Queue Size=" + (queueSize <= 0 ? DEFAULT_QUEUE_SIZE : queueSize) + + ", Keep Alive Time=" + (keepAliveTime <= 0 ? DEFAULT_KEEP_ALIVE_TIME : keepAliveTime) + + " " + String.valueOf(keepAliveTimeUnit == null ? DEFAULT_KEEP_ALIVE_TIME_UNIT : keepAliveTimeUnit)); } diff --git a/amp/src/main/java/org/alfresco/extension/bulkimport/impl/BulkImporterImpl.java b/amp/src/main/java/org/alfresco/extension/bulkimport/impl/BulkImporterImpl.java index 0ab8709..dd07f80 100644 --- a/amp/src/main/java/org/alfresco/extension/bulkimport/impl/BulkImporterImpl.java +++ b/amp/src/main/java/org/alfresco/extension/bulkimport/impl/BulkImporterImpl.java @@ -219,15 +219,26 @@ public void start(final BulkImportSource source, @Override public void stop() { - // Note: this must be called first, as the various threads look for this status to determine if their - // interruption was expected or not. - importStatus.stopRequested(); - - if (scannerThread != null && - scannerThread.isAlive()) + if (importStatus.inProgress()) + { + if (info(log)) info(log, "Stop requested."); + + // Note: this must be called first, as the various threads look for this status to determine if their + // interruption was expected or not. + importStatus.stopRequested(); + + if (scannerThread != null) + { + scannerThread.interrupt(); // This indirectly whacks the entire import thread pool too + } + else + { + if (warn(log)) warn(log, "Scanner thread was null."); + } + } + else { - scannerThread.interrupt(); // This indirectly whacks the entire import thread pool too - scannerThread = null; + throw new IllegalStateException("No import in progress."); } } diff --git a/amp/src/main/java/org/alfresco/extension/bulkimport/impl/Scanner.java b/amp/src/main/java/org/alfresco/extension/bulkimport/impl/Scanner.java index 129b736..6b7e08d 100644 --- a/amp/src/main/java/org/alfresco/extension/bulkimport/impl/Scanner.java +++ b/amp/src/main/java/org/alfresco/extension/bulkimport/impl/Scanner.java @@ -30,8 +30,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.alfresco.service.ServiceRegistry; import org.alfresco.service.cmr.repository.NodeRef; + import org.alfresco.extension.bulkimport.BulkImportCallback; import org.alfresco.extension.bulkimport.BulkImportCompletionHandler; import org.alfresco.extension.bulkimport.BulkImportStatus; @@ -92,6 +94,7 @@ public final class Scanner private int currentBatchNumber; private List> currentBatch; private int weightOfCurrentBatch; + private boolean filePhase; private boolean multiThreadedImport; @@ -137,6 +140,7 @@ public Scanner(final ServiceRegistry serviceRegistry, currentBatchNumber = 0; currentBatch = null; weightOfCurrentBatch = 0; + filePhase = false; multiThreadedImport = false; } @@ -166,29 +170,21 @@ public void run() rootPhaser.register(); - // Default pool sizes (which get overridden per phase) - final int folderPhasePoolSize = importThreadPool.getCorePoolSize(); - final int filePhasePoolSize = importThreadPool.getMaximumPoolSize(); - // ------------------------------------------------------------------ - // Phase 1 - Folder scanning + // Phase 1 - Folder scanning (single threaded) // ------------------------------------------------------------------ - // Minimise level of concurrency, to reduce risk of out-of-order batches (child before parent) - - importThreadPool.setCorePoolSize(folderPhasePoolSize); - importThreadPool.setMaximumPoolSize(folderPhasePoolSize); source.scanFolders(importStatus, this); - if (debug(log)) debug(log, "Folder scan complete in " + getHumanReadableDuration(importStatus.getDurationInNs()) + "."); + if (debug(log)) debug(log, "Folder import complete in " + getHumanReadableDuration(importStatus.getDurationInNs()) + "."); // ------------------------------------------------------------------ // Phase 2 - File scanning // ------------------------------------------------------------------ + filePhase = true; + // Maximise level of concurrency, since there's no longer any risk of out-of-order batches - importThreadPool.setCorePoolSize(filePhasePoolSize); - importThreadPool.setMaximumPoolSize(filePhasePoolSize); source.scanFiles(importStatus, this); if (debug(log)) debug(log, "File scan complete in " + getHumanReadableDuration(importStatus.getDurationInNs()) + "."); @@ -201,19 +197,18 @@ public void run() submitCurrentBatch(); // Submit whatever is left in the final (partial) batch... awaitCompletion(); - importThreadPool.shutdown(); - importThreadPool.await(); if (debug(log)) debug(log, "Import complete" + (multiThreadedImport ? ", thread pool shutdown" : "") + "."); } catch (final Throwable t) { - Throwable rootCause = getRootCause(t); // Unwrap exceptions to get the root cause + Throwable rootCause = getRootCause(t); String rootCauseClassName = rootCause.getClass().getName(); - if (rootCause instanceof InterruptedException || - rootCause instanceof ClosedByInterruptException || - "com.hazelcast.core.RuntimeInterruptedException".equals(rootCauseClassName)) // Avoid a static dependency on Hazelcast... + if (importStatus.isStopping() && + (rootCause instanceof InterruptedException || + rootCause instanceof ClosedByInterruptException || + "com.hazelcast.core.RuntimeInterruptedException".equals(rootCauseClassName))) // For compatibility across 4.x *sigh* { // A stop import was requested if (debug(log)) debug(log, Thread.currentThread().getName() + " was interrupted by a stop request.", t); @@ -270,7 +265,7 @@ public void run() } } - + /** * @see org.alfresco.extension.bulkimport.BulkImportCallback#submit(org.alfresco.extension.bulkimport.source.BulkImportItem) */ @@ -292,7 +287,7 @@ public synchronized void submit(final BulkImportItem item) } // Body - if (Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early."); + if (importStatus.isStopping() || Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early."); // If the weight of the new item would blow out the current batch, submit the batch as-is (i.e. *before* adding the newly submitted item). // This ensures that heavy items start a new batch (and possibly end up in a batch by themselves). @@ -339,7 +334,7 @@ private synchronized void submitCurrentBatch() batchImporter.importBatch(this, userId, target, batch, replaceExisting, dryRun); // Check if the multi-threading threshold has been reached - multiThreadedImport = currentBatchNumber >= MULTITHREADING_THRESHOLD; + multiThreadedImport = filePhase && currentBatchNumber >= MULTITHREADING_THRESHOLD; if (multiThreadedImport && debug(log)) debug(log, "Multi-threading threshold (" + MULTITHREADING_THRESHOLD + " batch" + pluralise(MULTITHREADING_THRESHOLD, "es") + ") reached - switching to multi-threaded import."); } @@ -409,6 +404,9 @@ private final void awaitCompletion() logStatusInfo(); } } + + importThreadPool.shutdown(); + importThreadPool.await(); } } @@ -508,21 +506,17 @@ public void run() } catch (final Throwable t) { - Throwable rootCause = t; - - while (rootCause.getCause() != null) - { - rootCause = rootCause.getCause(); - } - - String rootCauseClassName = rootCause.getClass().getName(); + Throwable rootCause = getRootCause(t); + String rootCauseClassName = rootCause.getClass().getName(); - if (rootCause instanceof InterruptedException || - rootCause instanceof ClosedByInterruptException || - "com.hazelcast.core.RuntimeInterruptedException".equals(rootCauseClassName)) // For compatibility across 4.x *sigh* + if (importStatus.isStopping() && + (rootCause instanceof InterruptedException || + rootCause instanceof ClosedByInterruptException || + "com.hazelcast.core.RuntimeInterruptedException".equals(rootCauseClassName))) // For compatibility across 4.x *sigh* { // A stop import was requested if (debug(log)) debug(log, Thread.currentThread().getName() + " was interrupted by a stop request.", t); + Thread.currentThread().interrupt(); } else { diff --git a/amp/src/main/java/org/alfresco/extension/bulkimport/source/fs/DirectoryAnalyser.java b/amp/src/main/java/org/alfresco/extension/bulkimport/source/fs/DirectoryAnalyser.java index d17ebb7..8f170dc 100644 --- a/amp/src/main/java/org/alfresco/extension/bulkimport/source/fs/DirectoryAnalyser.java +++ b/amp/src/main/java/org/alfresco/extension/bulkimport/source/fs/DirectoryAnalyser.java @@ -145,6 +145,7 @@ public Pair, List> anal private Pair, List> analyseDirectory(final String sourceRelativeParentDirectory, final File[] directoryListing) + throws InterruptedException { Pair, List> result = null; @@ -163,6 +164,7 @@ private Pair, List> ana private Map>> categoriseFiles(final File[] directoryListing) + throws InterruptedException { Map>> result = null; @@ -172,6 +174,8 @@ private Map>> categoriseFiles(fin for (final File file : directoryListing) { + if (importStatus.isStopping() || Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early."); + categoriseFile(result, file); } } @@ -244,6 +248,7 @@ private void categoriseFile(final Map, List> constructImportItems(final String sourceRelativeParentDirectory, final Map>> categorisedFiles) + throws InterruptedException { Pair, List> result = null; @@ -256,6 +261,8 @@ private Pair, List> con for (final String parentName : categorisedFiles.keySet()) { + if (importStatus.isStopping() || Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early."); + final SortedMap> itemVersions = categorisedFiles.get(parentName); final NavigableSet versions = constructImportItemVersions(itemVersions); final boolean isDirectory = versions.last().isDirectory(); @@ -280,6 +287,7 @@ private Pair, List> con private final NavigableSet constructImportItemVersions(final SortedMap> itemVersions) + throws InterruptedException { // PRECONDITIONS if (itemVersions == null) throw new IllegalArgumentException("itemVersions cannot be null."); @@ -290,6 +298,8 @@ private final NavigableSet constructImportItemV for (final BigDecimal versionNumber : itemVersions.keySet()) { + if (importStatus.isStopping() || Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early."); + final Pair contentAndMetadataFiles = itemVersions.get(versionNumber); final FilesystemBulkImportItemVersion version = new FilesystemBulkImportItemVersion(serviceRegistry, configuredContentStore, diff --git a/amp/src/main/java/org/alfresco/extension/bulkimport/source/fs/FilesystemBulkImportSource.java b/amp/src/main/java/org/alfresco/extension/bulkimport/source/fs/FilesystemBulkImportSource.java index 43dc91c..ab75689 100644 --- a/amp/src/main/java/org/alfresco/extension/bulkimport/source/fs/FilesystemBulkImportSource.java +++ b/amp/src/main/java/org/alfresco/extension/bulkimport/source/fs/FilesystemBulkImportSource.java @@ -28,8 +28,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.alfresco.repo.content.ContentStore; import org.alfresco.util.Pair; + import org.alfresco.extension.bulkimport.BulkImportCallback; import org.alfresco.extension.bulkimport.source.AbstractBulkImportSource; import org.alfresco.extension.bulkimport.source.BulkImportSourceStatus; @@ -62,11 +64,12 @@ public final class FilesystemBulkImportSource private File sourceDirectory = null; - public FilesystemBulkImportSource(final DirectoryAnalyser directoryAnalyser, - final ContentStore configuredContentStore, - final List importFilters) + public FilesystemBulkImportSource(final BulkImportSourceStatus importStatus, + final DirectoryAnalyser directoryAnalyser, + final ContentStore configuredContentStore, + final List importFilters) { - super(IMPORT_SOURCE_NAME, IMPORT_SOURCE_DESCRIPTION, IMPORT_SOURCE_CONFIG_UI_URI, null); + super(importStatus, IMPORT_SOURCE_NAME, IMPORT_SOURCE_DESCRIPTION, IMPORT_SOURCE_CONFIG_UI_URI, null); // PRECONDITIONS assert directoryAnalyser != null : "directoryAnalyser must not be null."; @@ -200,6 +203,8 @@ private void scanDirectory(final BulkImportSourceStatus status, { for (final FilesystemBulkImportItem directoryItem : directoryItems) { + if (importStatus.isStopping() || Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early."); + if (!filter(directoryItem)) { callback.submit(directoryItem); @@ -211,6 +216,8 @@ private void scanDirectory(final BulkImportSourceStatus status, { for (final FilesystemBulkImportItem fileItem : fileItems) { + if (importStatus.isStopping() || Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early."); + if (!filter(fileItem)) { callback.submit(fileItem); @@ -227,6 +234,8 @@ private void scanDirectory(final BulkImportSourceStatus status, for (final FilesystemBulkImportItem directoryItem : directoryItems) { + if (importStatus.isStopping() || Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early."); + if (!filter(directoryItem)) { final FilesystemBulkImportItemVersion lastVersion = directoryItem.getVersions().last(); // Directories shouldn't have versions, but grab the last one (which will have the directory file pointer) just in case... diff --git a/amp/src/main/java/org/alfresco/extension/bulkimport/util/LogUtils.java b/amp/src/main/java/org/alfresco/extension/bulkimport/util/LogUtils.java index 5285258..d939998 100644 --- a/amp/src/main/java/org/alfresco/extension/bulkimport/util/LogUtils.java +++ b/amp/src/main/java/org/alfresco/extension/bulkimport/util/LogUtils.java @@ -20,6 +20,10 @@ package org.alfresco.extension.bulkimport.util; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; + import org.apache.commons.logging.Log; /** @@ -134,7 +138,35 @@ public final static String getHumanReadableDuration(final long durationInNs, fin public final static String getDurationInSeconds(final long durationInNs) { return((float)durationInNs / NS_PER_SECOND + "s"); - } + } + + + public final static String dumpThread(final String threadName) + { + final StringBuilder result = new StringBuilder(); + final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + final ThreadInfo[] threadsInfo = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100); + + for (final ThreadInfo threadInfo : threadsInfo) + { + if (threadName == null || threadName.equals(threadInfo.getThreadName())) + { + result.append("\nName: "); + result.append(threadInfo.getThreadName()); + result.append("\nState: "); + result.append(threadInfo.getThreadState()); + result.append("\nStack Trace:"); + + for (final StackTraceElement stackTraceElement : threadInfo.getStackTrace()) + { + result.append("\n\t\tat "); + result.append(stackTraceElement); + } + } + } + + return(result.toString()); + } // TRACE level methods diff --git a/amp/src/main/java/org/alfresco/extension/bulkimport/util/Utils.java b/amp/src/main/java/org/alfresco/extension/bulkimport/util/Utils.java index 0c6aac5..b0a7c33 100644 --- a/amp/src/main/java/org/alfresco/extension/bulkimport/util/Utils.java +++ b/amp/src/main/java/org/alfresco/extension/bulkimport/util/Utils.java @@ -217,10 +217,10 @@ public final static String pluralise(final Number number, final String pluralFor public final static String buildTextMessage(final Throwable t) { - StringBuffer result = new StringBuffer(); - String timeOfFailure = (new Date()).toString(); - String hostName = null; - String ipAddress = null; + StringBuilder result = new StringBuilder(); + String timeOfFailure = (new Date()).toString(); + String hostName = null; + String ipAddress = null; try { @@ -252,7 +252,7 @@ public final static String buildTextMessage(final Throwable t) public final static String renderExceptionStackAsText(final Throwable t) { - StringBuffer result = new StringBuffer(); + StringBuilder result = new StringBuilder(); if (t != null) { @@ -284,7 +284,7 @@ public final static String renderExceptionStackAsText(final Throwable t) private final static String renderStackTraceElements(final StackTraceElement[] elements) { - StringBuffer result = new StringBuffer(); + StringBuilder result = new StringBuilder(); if (elements != null) { @@ -296,5 +296,6 @@ private final static String renderStackTraceElements(final StackTraceElement[] e return(result.toString()); } - + + } diff --git a/amp/src/main/java/org/alfresco/extension/bulkimport/webscripts/BulkImportStopWebScript.java b/amp/src/main/java/org/alfresco/extension/bulkimport/webscripts/BulkImportStopWebScript.java index 4d0b6cd..cc54c76 100644 --- a/amp/src/main/java/org/alfresco/extension/bulkimport/webscripts/BulkImportStopWebScript.java +++ b/amp/src/main/java/org/alfresco/extension/bulkimport/webscripts/BulkImportStopWebScript.java @@ -64,18 +64,14 @@ protected Map executeImpl(final WebScriptRequest request, final if (importer.getStatus().inProgress()) { - if (importer.getStatus().isStopping()) - { - status.setCode(Status.STATUS_ACCEPTED, "A stop has previously been requested, and is in progress."); - } - else - { - importer.stop(); - status.setCode(Status.STATUS_ACCEPTED, "Stop requested."); - } + result.put("result", "stop requested"); + importer.stop(); + status.setCode(Status.STATUS_ACCEPTED, "Stop requested."); + status.setRedirect(true); // Make sure the custom 202 status template is used (why this is needed at all is beyond me...) } else { + result.put("result", "no imports in progress"); status.setCode(Status.STATUS_BAD_REQUEST, "No bulk imports are in progress."); } diff --git a/api/src/main/java/org/alfresco/extension/bulkimport/BulkImportCallback.java b/api/src/main/java/org/alfresco/extension/bulkimport/BulkImportCallback.java index 3858301..1983f3e 100644 --- a/api/src/main/java/org/alfresco/extension/bulkimport/BulkImportCallback.java +++ b/api/src/main/java/org/alfresco/extension/bulkimport/BulkImportCallback.java @@ -49,4 +49,5 @@ public interface BulkImportCallback @SuppressWarnings("rawtypes") public void submit(BulkImportItem item) throws InterruptedException; + } diff --git a/api/src/main/java/org/alfresco/extension/bulkimport/source/AbstractBulkImportSource.java b/api/src/main/java/org/alfresco/extension/bulkimport/source/AbstractBulkImportSource.java index 56b0ebc..7d42a4a 100644 --- a/api/src/main/java/org/alfresco/extension/bulkimport/source/AbstractBulkImportSource.java +++ b/api/src/main/java/org/alfresco/extension/bulkimport/source/AbstractBulkImportSource.java @@ -34,17 +34,21 @@ public abstract class AbstractBulkImportSource implements BulkImportSource { + protected final BulkImportSourceStatus importStatus; + private final String name; private final String description; private final String configWebScriptUri; private final String[] counterNames; - protected AbstractBulkImportSource(final String name, - final String description, - final String configWebScriptUri, - final String[] counterNames) + protected AbstractBulkImportSource(final BulkImportSourceStatus importStatus, + final String name, + final String description, + final String configWebScriptUri, + final String[] counterNames) { + this.importStatus = importStatus; this.name = name; this.description = description; this.configWebScriptUri = configWebScriptUri; diff --git a/contrib/sources/sample-source/src/main/amp/config/alfresco/module/org.alfresco.extension.sample-bulk-import-source/module-context.xml b/contrib/sources/sample-source/src/main/amp/config/alfresco/module/org.alfresco.extension.sample-bulk-import-source/module-context.xml index ca1db4d..accd36f 100644 --- a/contrib/sources/sample-source/src/main/amp/config/alfresco/module/org.alfresco.extension.sample-bulk-import-source/module-context.xml +++ b/contrib/sources/sample-source/src/main/amp/config/alfresco/module/org.alfresco.extension.sample-bulk-import-source/module-context.xml @@ -9,6 +9,8 @@ http://www.springframework.org/schema/util/spring-util-3.0.xsd"> - - + + + + diff --git a/contrib/sources/sample-source/src/main/java/org/alfresco/extension/bulkimport/source/sample/SampleSource.java b/contrib/sources/sample-source/src/main/java/org/alfresco/extension/bulkimport/source/sample/SampleSource.java index 2b34e35..67a5064 100644 --- a/contrib/sources/sample-source/src/main/java/org/alfresco/extension/bulkimport/source/sample/SampleSource.java +++ b/contrib/sources/sample-source/src/main/java/org/alfresco/extension/bulkimport/source/sample/SampleSource.java @@ -49,9 +49,9 @@ public final class SampleSource private final static String[] SOURCE_COUNTERS = { SOURCE_COUNTER_NAME_FOLDERS_SYNTHESISED, SOURCE_COUNTER_NAME_FILES_SYNTHESISED }; - public SampleSource() + public SampleSource(final BulkImportSourceStatus importStatus) { - super(IMPORT_SOURCE_NAME, IMPORT_SOURCE_DESCRIPTION, IMPORT_SOURCE_CONFIG_UI_URI, SOURCE_COUNTERS); + super(importStatus, IMPORT_SOURCE_NAME, IMPORT_SOURCE_DESCRIPTION, IMPORT_SOURCE_CONFIG_UI_URI, SOURCE_COUNTERS); } diff --git a/test/data/SinglePassTests/FolderStructureTests/make-large-folder-structure.clj b/test/data/SinglePassTests/FolderStructureTests/make-large-folder-structure.clj index 63cf367..33b6f42 100755 --- a/test/data/SinglePassTests/FolderStructureTests/make-large-folder-structure.clj +++ b/test/data/SinglePassTests/FolderStructureTests/make-large-folder-structure.clj @@ -29,7 +29,7 @@ (require '[clojure.string :as s]) ; Be very careful changing this, as it's raised to the power 5 to come up with the final directory list! -(def dirs-per-dir 13) +(def dirs-per-dir 12) (def min-name-length 4) (def max-name-length 20)