Skip to content
This repository has been archived by the owner on Sep 25, 2022. It is now read-only.

Commit

Permalink
🐛 Fix issues 46, 47, & 48
Browse files Browse the repository at this point in the history
  • Loading branch information
pmonks committed Oct 7, 2015
1 parent 6fb5a5f commit 1ae3d70
Show file tree
Hide file tree
Showing 22 changed files with 180 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
<shortname>Bulk Import - Stop</shortname>
<description>Web Script that provides a way to request that a bulk import be stopped.</description>
<url>/bulk/import/stop</url>
<format default="json"/>
<authentication>admin</authentication>
<family>Bulk Import</family>
<cache>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[#ftl]
{
"result" : "${result}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,17 @@
# content file size.
alfresco-bulk-import.batch.weight=100

# The size of the thread pool during the folder import phase (must be > 0)
alfresco-bulk-import.folder.threadpool.size=2
# The size of the thread pool during the file import phase (<= 0 means autosize
# based on the number of CPU cores in the server)
# The size of the thread pool (during the file import phase only)
# <= 0 means autosize based on the number of CPU cores in the server
alfresco-bulk-import.file.threadpool.size=-1

# The maximum size (number of batches) allowed in the queue, before scanning
# receives back-pressure (i.e. gets blocked)
alfresco-bulk-import.batch.queue.size=100

# How long to keep inactive threads alive
alfresco-bulk-import.threadpool.keepAlive.time=30
alfresco-bulk-import.threadpool.keepAlive.units=SECONDS
alfresco-bulk-import.threadpool.keepAlive.time=10
alfresco-bulk-import.threadpool.keepAlive.units=MINUTES


###############################################################################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@
<bean id="bit.import-thread-pool"
class="org.alfresco.extension.bulkimport.impl.BulkImportThreadPoolExecutor"
scope="prototype">
<constructor-arg index="0" value="${alfresco-bulk-import.folder.threadpool.size}" />
<constructor-arg index="1" value="${alfresco-bulk-import.file.threadpool.size}" />
<constructor-arg index="2" value="${alfresco-bulk-import.batch.queue.size}" />
<constructor-arg index="3" value="${alfresco-bulk-import.threadpool.keepAlive.time}" />
<constructor-arg index="4" value="${alfresco-bulk-import.threadpool.keepAlive.units}" />
<constructor-arg index="0" value="${alfresco-bulk-import.file.threadpool.size}" />
<constructor-arg index="1" value="${alfresco-bulk-import.batch.queue.size}" />
<constructor-arg index="2" value="${alfresco-bulk-import.threadpool.keepAlive.time}" />
<constructor-arg index="3" value="${alfresco-bulk-import.threadpool.keepAlive.units}" />
</bean>

<bean id="bit.batch-importer"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
<!-- Filesystem source -->
<bean id="bit.fs.source"
class="org.alfresco.extension.bulkimport.source.fs.FilesystemBulkImportSource">
<constructor-arg index="0" ref="bit.fs.directory-analyser" />
<constructor-arg index="1" ref="fileContentStore" />
<constructor-arg index="2" ref="bit.fs.source.import-filters" />
<constructor-arg index="0" ref="bit.status" />
<constructor-arg index="1" ref="bit.fs.directory-analyser" />
<constructor-arg index="2" ref="fileContentStore" />
<constructor-arg index="3" ref="bit.fs.source.import-filters" />
</bean>

<!-- Directory Analyser -->
Expand Down
1 change: 0 additions & 1 deletion amp/src/main/amp/web/scripts/bulkimport/bulkimport.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
* This file contains the browser functionality used by the HTML status page.
*/

//####TODO: CHANGE THIS PRIOR TO RELEASE!
var logLevel = log.levels.DEBUG; // See http://pimterry.github.io/loglevel/ for details

// Global variables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,18 +180,18 @@ private final void importBatchImpl(final NodeRef target,
{
for (final BulkImportItem<BulkImportItemVersion> item : batch)
{
if (Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early.");
if (importStatus.isStopping() || Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early.");

importItem(target, item, replaceExisting, dryRun);
}
}
}


private final void importItem(final NodeRef target,
private final void importItem(final NodeRef target,
final BulkImportItem<BulkImportItemVersion> item,
final boolean replaceExisting,
final boolean dryRun)
final boolean replaceExisting,
final boolean dryRun)
throws InterruptedException
{
try
Expand All @@ -216,9 +216,13 @@ private final void importItem(final NodeRef target,

if (trace(log)) trace(log, "Finished importing " + String.valueOf(item));
}
catch (final InterruptedException ie)
{
Thread.currentThread().interrupt();
throw ie;
}
catch (final OutOfOrderBatchException oobe)
{
// Fix issue #40 - https://github.com/pmonks/alfresco-bulk-import/issues/40
throw oobe;
}
catch (final Exception e)
Expand Down Expand Up @@ -403,7 +407,7 @@ else if (numberOfVersions == 1)

for (final BulkImportItemVersion version : item.getVersions())
{
if (Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early.");
if (importStatus.isStopping() || Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early.");

importVersion(nodeRef, previousVersion, version, dryRun, false);
previousVersion = version;
Expand Down Expand Up @@ -508,7 +512,7 @@ private final void importVersionMetadata(final NodeRef nodeRef,
{
for (final String aspect : aspects)
{
if (Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early.");
if (importStatus.isStopping() || Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early.");

if (dryRun)
{
Expand All @@ -532,7 +536,7 @@ private final void importVersionMetadata(final NodeRef nodeRef,

for (final String key : metadata.keySet())
{
if (Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early.");
if (importStatus.isStopping() || Thread.currentThread().isInterrupted()) throw new InterruptedException(Thread.currentThread().getName() + " was interrupted. Terminating early.");

QName keyQName = createQName(serviceRegistry, key);
Serializable value = metadata.get(key);
Expand Down Expand Up @@ -626,5 +630,4 @@ private final void importVersionContent(final NodeRef nodeRef,
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,11 @@ public void importComplete()
this.endDate = new Date();
this.threadPool = null;

if (ProcessingState.STOPPING.equals(this.state))
if (isStopping())
{
this.state = ProcessingState.STOPPED;
}
else if (this.lastException != null)
else if (getLastException() != null)
{
this.state = ProcessingState.FAILED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ public Thread newThread(final Runnable runnable)
}


/**
* Resets this thread pool (i.e. sets the thread number counter back to zero).
*/
public void reset()
{
currentThreadNumber.set(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,33 +42,30 @@ public class BulkImportThreadPoolExecutor
{
private final static Log log = LogFactory.getLog(BulkImportThreadPoolExecutor.class);

private final static int DEFAULT_FOLDER_THREAD_POOL_SIZE = 2;
private final static int DEFAULT_FILE_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2; // We naively assume 50% of time is spent blocked on I/O
private final static long DEFAULT_KEEP_ALIVE_TIME = 1L;
private final static TimeUnit DEFAULT_KEEP_ALIVE_TIME_UNIT = TimeUnit.MINUTES;
private final static int DEFAULT_QUEUE_SIZE = 10000;
private final static int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2; // We naively assume 50% of time is spent blocked on I/O
private final static long DEFAULT_KEEP_ALIVE_TIME = 10L;
private final static TimeUnit DEFAULT_KEEP_ALIVE_TIME_UNIT = TimeUnit.MINUTES;
private final static int DEFAULT_QUEUE_SIZE = 100;


public BulkImportThreadPoolExecutor(final int folderThreadPoolSize,
final int fileThreadPoolSize,
public BulkImportThreadPoolExecutor(final int threadPoolSize,
final int queueSize,
final long keepAliveTime,
final TimeUnit keepAliveTimeUnit)
{
super(folderThreadPoolSize <= 0 ? DEFAULT_FOLDER_THREAD_POOL_SIZE : folderThreadPoolSize, // Core pool size
fileThreadPoolSize <= 0 ? DEFAULT_FILE_THREAD_POOL_SIZE : fileThreadPoolSize, // Max pool size (same as core pool size)
keepAliveTime <= 0 ? DEFAULT_KEEP_ALIVE_TIME : keepAliveTime, // Keep alive
keepAliveTimeUnit == null ? DEFAULT_KEEP_ALIVE_TIME_UNIT : keepAliveTimeUnit, // Keep alive units
new ArrayBlockingQueue<Runnable>(queueSize <= (BulkImporterImpl.DEFAULT_BATCH_WEIGHT * 2) ? DEFAULT_QUEUE_SIZE : queueSize, true), // Queue, with fairness enabled (to get true FIFO, thereby minimising out-of-order retries)
new BulkImportThreadFactory(), // Thread factory
new ThreadPoolExecutor.AbortPolicy()); // Rejection handler
super(threadPoolSize <= 0 ? DEFAULT_THREAD_POOL_SIZE : threadPoolSize, // Core pool size
threadPoolSize <= 0 ? DEFAULT_THREAD_POOL_SIZE : threadPoolSize, // Max pool size (same as core pool size)
keepAliveTime <= 0 ? DEFAULT_KEEP_ALIVE_TIME : keepAliveTime, // Keep alive
keepAliveTimeUnit == null ? DEFAULT_KEEP_ALIVE_TIME_UNIT : keepAliveTimeUnit, // Keep alive units
new ArrayBlockingQueue<Runnable>((queueSize <= 0 ? DEFAULT_QUEUE_SIZE : queueSize), true), // Queue, with fairness enabled (to get true FIFO, thereby minimising out-of-order retries)
new BulkImportThreadFactory(), // Thread factory
new ThreadPoolExecutor.AbortPolicy()); // Rejection handler

if (debug(log)) debug(log, "Creating new bulk import thread pool." +
" Folder thread Pool Size=" + (folderThreadPoolSize <= 0 ? DEFAULT_FOLDER_THREAD_POOL_SIZE : folderThreadPoolSize) +
" Thread Pool Size=" + (fileThreadPoolSize <= 0 ? DEFAULT_FILE_THREAD_POOL_SIZE : fileThreadPoolSize) +
", Queue Size=" + (queueSize <= 0 ? DEFAULT_QUEUE_SIZE : queueSize) +
", Keep Alive Time=" + (keepAliveTime <= 0 ? DEFAULT_KEEP_ALIVE_TIME : keepAliveTime) +
" " + String.valueOf(keepAliveTimeUnit == null ? DEFAULT_KEEP_ALIVE_TIME_UNIT : keepAliveTimeUnit));
" Thread Pool Size=" + (threadPoolSize <= 0 ? DEFAULT_THREAD_POOL_SIZE : threadPoolSize) +
", Queue Size=" + (queueSize <= 0 ? DEFAULT_QUEUE_SIZE : queueSize) +
", Keep Alive Time=" + (keepAliveTime <= 0 ? DEFAULT_KEEP_ALIVE_TIME : keepAliveTime) +
" " + String.valueOf(keepAliveTimeUnit == null ? DEFAULT_KEEP_ALIVE_TIME_UNIT : keepAliveTimeUnit));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,26 @@ public void start(final BulkImportSource source,
@Override
public void stop()
{
// Note: this must be called first, as the various threads look for this status to determine if their
// interruption was expected or not.
importStatus.stopRequested();

if (scannerThread != null &&
scannerThread.isAlive())
if (importStatus.inProgress())
{
if (info(log)) info(log, "Stop requested.");

// Note: this must be called first, as the various threads look for this status to determine if their
// interruption was expected or not.
importStatus.stopRequested();

if (scannerThread != null)
{
scannerThread.interrupt(); // This indirectly whacks the entire import thread pool too
}
else
{
if (warn(log)) warn(log, "Scanner thread was null.");
}
}
else
{
scannerThread.interrupt(); // This indirectly whacks the entire import thread pool too
scannerThread = null;
throw new IllegalStateException("No import in progress.");
}
}

Expand Down
Loading

0 comments on commit 1ae3d70

Please sign in to comment.