From a990003af7f77d95cadc8f781f172fdcdb76c67b Mon Sep 17 00:00:00 2001 From: Pascal Essiembre Date: Mon, 14 Oct 2024 16:26:33 -0400 Subject: [PATCH] Fixed error being throws when issuing stop command. Now requires Java 11. https://github.com/Norconex/crawlers/issues/1067 --- CHANGES.xml | 11 +- TODO.txt | 3 + pom.xml | 10 +- .../norconex/collector/core/Collector.java | 104 +++++++++++++----- .../collector/core/crawler/Crawler.java | 15 ++- .../GenericSpoiledReferenceStrategizer.java | 16 +-- .../core/stop/ICollectorStopper.java | 12 +- .../core/stop/impl/FileBasedStopper.java | 71 ++++++------ .../collector/core/CollectorTest.java | 15 ++- 9 files changed, 165 insertions(+), 92 deletions(-) diff --git a/CHANGES.xml b/CHANGES.xml index 029d7e7..17642a0 100644 --- a/CHANGES.xml +++ b/CHANGES.xml @@ -6,7 +6,16 @@ Norconex Inc. - + + + + Minimum Java Version is now 11. + + + Fixed crawler throwing error when issuing a stop command. + + + New "deferredShutdownDuration" collector configuration option to delay diff --git a/TODO.txt b/TODO.txt index 276824b..ac58e3b 100644 --- a/TODO.txt +++ b/TODO.txt @@ -3,6 +3,9 @@ TODO: - Force crawler to stop after X amount of time of no progress. +- Have the GenericURLNormalizer offer a mix of replacements and canned rules + in desired order of execution. + - Performance: - Keep tack of counts by having counters in memory instead of querying for count. And have maxXXX for different types instead of just diff --git a/pom.xml b/pom.xml index e62a059..9209d3e 100644 --- a/pom.xml +++ b/pom.xml @@ -19,11 +19,11 @@ com.norconex.commons norconex-commons-maven-parent - 1.0.2 + 1.1.0-SNAPSHOT com.norconex.collectors norconex-collector-core - 2.0.2 + 2.1.0-SNAPSHOT Norconex Collector Core @@ -130,6 +130,12 @@ resources zip provided + + + xml-apis + xml-apis + + diff --git a/src/main/java/com/norconex/collector/core/Collector.java b/src/main/java/com/norconex/collector/core/Collector.java index 6b79ab5..644ebf2 100644 --- a/src/main/java/com/norconex/collector/core/Collector.java +++ b/src/main/java/com/norconex/collector/core/Collector.java @@ -26,7 +26,6 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.ArrayList; @@ -62,8 +61,6 @@ import com.norconex.commons.lang.Sleeper; import com.norconex.commons.lang.VersionUtil; import com.norconex.commons.lang.event.EventManager; -import com.norconex.commons.lang.file.FileAlreadyLockedException; -import com.norconex.commons.lang.file.FileLocker; import com.norconex.commons.lang.file.FileUtil; import com.norconex.commons.lang.io.CachedStreamFactory; import com.norconex.commons.lang.time.DurationFormatter; @@ -89,7 +86,6 @@ public abstract class Collector { + "| |\\ | |_| | _ <| |__| |_| | |\\ | |___ / \\ \n" + "|_| \\_|\\___/|_| \\_\\\\____\\___/|_| \\_|_____/_/\\_\\\n\n" + "============== C O L L E C T O R ==============\n"; - private static final Logger LOG = LoggerFactory.getLogger(Collector.class); @@ -103,7 +99,6 @@ public abstract class Collector { private CachedStreamFactory streamFactory; private Path workDir; private Path tempDir; - private FileLocker lock; //TODO make configurable private final ICollectorStopper stopper = new FileBasedStopper(); @@ -162,8 +157,8 @@ public synchronized Path getTempDir() { private Path createCollectorSubDirectory(Path parentDir) { Objects.requireNonNull(parentDir, "'parentDir' must not be null."); - String fileSafeId = FileUtil.toSafeFileName(getId()); - Path subDir = parentDir.resolve(fileSafeId); + var fileSafeId = FileUtil.toSafeFileName(getId()); + var subDir = parentDir.resolve(fileSafeId); try { Files.createDirectories(subDir); } catch (IOException e) { @@ -190,8 +185,8 @@ public void start() { eventManager.fire(new CollectorEvent.Builder( COLLECTOR_RUN_BEGIN, this).build()); - List crawlerList = getCrawlers(); - int maxConcurrent = collectorConfig.getMaxConcurrentCrawlers(); + var crawlerList = getCrawlers(); + var maxConcurrent = collectorConfig.getMaxConcurrentCrawlers(); if (maxConcurrent <= 0) { maxConcurrent = crawlerList.size(); } @@ -239,7 +234,7 @@ private void orderlyShutdown() { } private void startConcurrentCrawlers(int poolSize) { - Duration d = collectorConfig.getCrawlersStartInterval(); + var d = collectorConfig.getCrawlersStartInterval(); if (d == null || d.toMillis() <= 0) { startConcurrentCrawlers( poolSize, @@ -259,8 +254,8 @@ private void startConcurrentCrawlers( int poolSize, IntFunction poolSupplier, BiConsumer crawlerExecuter) { - final CountDownLatch latch = new CountDownLatch(crawlers.size()); - ExecutorService pool = poolSupplier.apply(poolSize); + final var latch = new CountDownLatch(crawlers.size()); + var pool = poolSupplier.apply(poolSize); try { getCrawlers().forEach(c -> crawlerExecuter.accept(pool, () -> { c.start(); @@ -284,6 +279,7 @@ private void startConcurrentCrawlers( public void clean() { MdcUtil.setCollectorId(getId()); Thread.currentThread().setName(getId() + "/CLEAN"); + lock(); try { initCollector(); @@ -293,6 +289,7 @@ public void clean() { + "impact previously committed data)...") .build()); getCrawlers().forEach(Crawler::clean); + Thread.currentThread().setName(getId() + "/CLEAN"); destroyCollector(); eventManager.fire(new CollectorEvent.Builder( COLLECTOR_CLEAN_END, this) @@ -382,7 +379,10 @@ protected void destroyCollector() { } public void fireStopRequest() { - stopper.fireStopRequest(); + LOG.info("Firing stop request..."); + if (stopper.fireStopRequest(this)) { + LOG.info("Stop request fired."); + } } /** @@ -390,6 +390,7 @@ public void fireStopRequest() { * different JVM instance than the instance we want to stop. */ public void stop() { + LOG.debug("Stopping the crawler."); if (!isRunning()) { LOG.info("CANNOT STOP: Collector is not running."); return; @@ -425,14 +426,15 @@ public EventManager getEventManager() { * Loads all crawlers from configuration. */ private void createCrawlers() { - if (getCrawlers().isEmpty()) { - List crawlerConfigs = + if (crawlers.isEmpty()) { + var crawlerConfigs = collectorConfig.getCrawlerConfigs(); if (crawlerConfigs != null) { for (CrawlerConfig crawlerConfig : crawlerConfigs) { crawlers.add(createCrawler(crawlerConfig)); } } + LOG.debug("Created {} crawler(s).", crawlers.size()); } else { LOG.debug("Crawlers already created."); } @@ -480,7 +482,7 @@ public String getVersion() { } public String getReleaseVersions() { - StringBuilder b = new StringBuilder() + var b = new StringBuilder() .append(NORCONEX_ASCII) .append("\nCollector and main components:\n") .append("\n"); @@ -529,39 +531,81 @@ private Set> nonCoreClasspathCommitters() { protected synchronized void lock() { LOG.debug("Locking collector execution..."); - lock = new FileLocker(getWorkDir().resolve(".collector-lock")); - try { - lock.lock(); - } catch (FileAlreadyLockedException e) { + + if (isRunning()) { throw new CollectorException( - "The collector you are attempting to run is already " - + "running or executing a command. Wait for " + "The collector you are attempting to run is " + + "already running or executing a command. Wait for " + "it to complete or stop it and try again."); + } + + var pidFile = getLockFile(); + var pid = ProcessHandle.current().pid(); + try { + Files.writeString(pidFile, Long.toString(pid)); + LOG.debug("PID saved to {}", pidFile.toAbsolutePath()); } catch (IOException e) { throw new CollectorException( - "Could not create a collector execution lock.", e); + "Could not create the following file to lock the " + + "crawler execution: " + pidFile.toAbsolutePath(), e); } - LOG.debug("Collector execution locked"); + + LOG.debug("Collector execution locked."); } protected synchronized void unlock() { + var pidFile = getLockFile(); try { - if (lock != null) { - lock.unlock(); + if (Files.exists(pidFile)) { + Files.delete(pidFile); + } else { + LOG.debug("Could not unlock crawler execution. " + + "Lock file not found: {}", pidFile); } } catch (IOException e) { throw new CollectorException( - "Cannot unlock collector execution.", e); + "Cannot unlock collector execution. Could not delete " + + "this file: " + pidFile.toAbsolutePath(), e); } - lock = null; LOG.debug("Collector execution unlocked"); } public boolean isRunning() { - return lock != null && lock.isLocked(); + var pidFile = getLockFile(); + + // If PID file already exist, check if the process it references is + // currently running. Else, we consider it not running. + if (Files.exists(pidFile)) { + try { + var pidStr = Files.readString(pidFile).trim(); + var pid = Long.parseLong(pidStr); + var processHandle = ProcessHandle.of(pid); + if (!processHandle.isPresent() + || !processHandle.get().isAlive()) { + LOG.debug("Found a lock file but the corresponding process " + + "is not running. Deleting it: {}", + pidFile.toAbsolutePath()); + Files.delete(pidFile); + return false; + } + return true; + } catch (IOException | NumberFormatException e) { + throw new CollectorException( + "Could not verify if the collector is already running. " + + "If you get this error again after confirming it is " + + "not running, delete the following file and try " + + "again: " + pidFile.toAbsolutePath(), e); + } + } + return false; } @Override public String toString() { - return getId(); + var id = getId(); + return StringUtils.isBlank(id) ? "" : id; + } + + private Path getLockFile() { + return getWorkDir().resolve(".collector-lock"); } } diff --git a/src/main/java/com/norconex/collector/core/crawler/Crawler.java b/src/main/java/com/norconex/collector/core/crawler/Crawler.java index 19292dc..a234eaa 100644 --- a/src/main/java/com/norconex/collector/core/crawler/Crawler.java +++ b/src/main/java/com/norconex/collector/core/crawler/Crawler.java @@ -243,15 +243,16 @@ public Path getDownloadDir() { * Starts crawling. */ public void start() { - boolean resume = initCrawler(); + initCrawler(); + boolean resume = crawlDocInfoService.open(); + importer = new Importer( getCrawlerConfig().getImporterConfig(), getEventManager()); monitor = new CrawlerMonitor(this); //TODO make interval configurable //TODO make general logging messages verbosity configurable - progressLogger = new CrawlProgressLogger( - monitor, 30 * 1000); + progressLogger = new CrawlProgressLogger(monitor, 30_000); progressLogger.startTracking(); if (Boolean.getBoolean("enableJMX")) { @@ -266,8 +267,8 @@ public void start() { //TODO move this code to a config validator class? if (StringUtils.isBlank(getCrawlerConfig().getId())) { - throw new CollectorException("Crawler must be given " - + "a unique identifier (id)."); + throw new CollectorException( + "Crawler must be given a unique identifier (id)."); } doExecute(); } finally { @@ -295,7 +296,7 @@ private void logUsefulInfo() { } } - protected boolean initCrawler() { + protected void initCrawler() { Thread.currentThread().setName(getId()); MdcUtil.setCrawlerId(getId()); @@ -325,11 +326,9 @@ protected boolean initCrawler() { .build(); committers.init(committerContext); - boolean resuming = crawlDocInfoService.open(); getEventManager().fire(new CrawlerEvent.Builder(CRAWLER_INIT_END, this) .message("Crawler \"" + getId() + "\" initialized successfully.").build()); - return resuming; } protected Class getCrawlDocInfoType() { diff --git a/src/main/java/com/norconex/collector/core/spoil/impl/GenericSpoiledReferenceStrategizer.java b/src/main/java/com/norconex/collector/core/spoil/impl/GenericSpoiledReferenceStrategizer.java index 92960a1..807d81f 100644 --- a/src/main/java/com/norconex/collector/core/spoil/impl/GenericSpoiledReferenceStrategizer.java +++ b/src/main/java/com/norconex/collector/core/spoil/impl/GenericSpoiledReferenceStrategizer.java @@ -42,7 +42,8 @@ * The mappings defined by default are as follow: *

* - * + *
+ * * * * @@ -86,7 +87,6 @@ public class GenericSpoiledReferenceStrategizer implements DEFAULT_FALLBACK_STRATEGY; public GenericSpoiledReferenceStrategizer() { - super(); // store default mappings mappings.put(CrawlState.NOT_FOUND, SpoiledReferenceStrategy.DELETE); mappings.put(CrawlState.BAD_STATUS, @@ -98,7 +98,7 @@ public GenericSpoiledReferenceStrategizer() { public SpoiledReferenceStrategy resolveSpoiledReferenceStrategy( String reference, CrawlState state) { - SpoiledReferenceStrategy strategy = mappings.get(state); + var strategy = mappings.get(state); if (strategy == null) { strategy = getFallbackStrategy(); } @@ -122,7 +122,7 @@ public void addMapping( @Override public void loadFromXML(XML xml) { - SpoiledReferenceStrategy fallback = xml.getEnum( + var fallback = xml.getEnum( "@fallbackStrategy", SpoiledReferenceStrategy.class, fallbackStrategy); if (fallback != null) { @@ -130,13 +130,13 @@ public void loadFromXML(XML xml) { } for (XML node : xml.getXMLList("mapping")) { - String attribState = node.getString("@state", null); - SpoiledReferenceStrategy strategy = node.getEnum( + var attribState = node.getString("@state", null); + var strategy = node.getEnum( "@strategy", SpoiledReferenceStrategy.class); if (StringUtils.isBlank(attribState) || strategy == null) { continue; } - CrawlState state = CrawlState.valueOf(attribState); + var state = CrawlState.valueOf(attribState); addMapping(state, strategy); } } @@ -158,7 +158,7 @@ public boolean equals(final Object other) { if (!(other instanceof GenericSpoiledReferenceStrategizer)) { return false; } - GenericSpoiledReferenceStrategizer castOther = + var castOther = (GenericSpoiledReferenceStrategizer) other; return new EqualsBuilder() .append(fallbackStrategy, castOther.fallbackStrategy) diff --git a/src/main/java/com/norconex/collector/core/stop/ICollectorStopper.java b/src/main/java/com/norconex/collector/core/stop/ICollectorStopper.java index 7533445..7001818 100644 --- a/src/main/java/com/norconex/collector/core/stop/ICollectorStopper.java +++ b/src/main/java/com/norconex/collector/core/stop/ICollectorStopper.java @@ -19,7 +19,7 @@ /** *

* Responsible for shutting down a Collector upon explicit invocation - * of {@link #fireStopRequest()} or when specific conditions are met. + * of {@link #fireStopRequest(Collector)} or when specific conditions are met. * See concrete implementation for what those conditions could be. *

*

@@ -48,10 +48,16 @@ void listenForStopRequest(Collector collector) void destroy() throws CollectorStopperException; /** - * Stops a currently running Collector. + * Stops a currently running Collector. The supplied collector + * may or may not be the same instance has the one to be stopped. + * Implementors should account for an non-initialized + * instance (where configuration for crawlers is set, but no crawlers + * were yet created with those configurations). + * @param collector collector instance * @return true if the Collector was running and successfully * stopped or false if the Collector was not running. * @throws CollectorStopperException could not stop running Collector. */ - boolean fireStopRequest() throws CollectorStopperException; + boolean fireStopRequest(Collector collector) + throws CollectorStopperException; } diff --git a/src/main/java/com/norconex/collector/core/stop/impl/FileBasedStopper.java b/src/main/java/com/norconex/collector/core/stop/impl/FileBasedStopper.java index 8a4e1d9..2f8becc 100644 --- a/src/main/java/com/norconex/collector/core/stop/impl/FileBasedStopper.java +++ b/src/main/java/com/norconex/collector/core/stop/impl/FileBasedStopper.java @@ -17,8 +17,9 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; @@ -28,7 +29,6 @@ import com.norconex.collector.core.monitor.MdcUtil; import com.norconex.collector.core.stop.CollectorStopperException; import com.norconex.collector.core.stop.ICollectorStopper; -import com.norconex.commons.lang.Sleeper; /** * Listens for STOP requests using a stop file. The stop file @@ -44,17 +44,19 @@ public class FileBasedStopper implements ICollectorStopper { LoggerFactory.getLogger(FileBasedStopper.class); private Collector startedCollector; - private boolean monitoring; + private ScheduledExecutorService execService; + private volatile boolean shutdownRequested = false; @Override public void listenForStopRequest(Collector startedCollector) throws CollectorStopperException { this.startedCollector = startedCollector; - final Path stopFile = stopFile(startedCollector); + final var stopFile = getStopFile(startedCollector); - // If there is already a stop file and the collector is not running, - // delete it - if (stopFile.toFile().exists() && !startedCollector.isRunning()) { + // At this point, we know there is only one instance of this crawler + // running. So upon starting, if there is a stop file, it has to be + // an old one that was not properly deleted. Delete it here. + if (stopFile.toFile().exists()) { LOG.info("Old stop file found, deleting it."); try { FileUtils.forceDelete(stopFile.toFile()); @@ -64,25 +66,18 @@ public void listenForStopRequest(Collector startedCollector) } } - ExecutorService execService = Executors.newSingleThreadExecutor(); - try { - execService.submit(() -> { - MdcUtil.setCollectorId(startedCollector.getId()); - Thread.currentThread().setName("Collector stop file monitor"); - monitoring = true; - while(monitoring) { - if (stopFile.toFile().exists()) { - stopMonitoring(startedCollector); - LOG.info("STOP request received."); - startedCollector.stop(); - } - Sleeper.sleepMillis(100); - } - return null; - }); - } finally { - execService.shutdownNow(); - } + execService = Executors.newSingleThreadScheduledExecutor(); + execService.scheduleWithFixedDelay(() -> { + if (shutdownRequested) { + return; + } + MdcUtil.setCollectorId(startedCollector.getId()); + Thread.currentThread().setName("Collector stop file monitor"); + if (stopFile.toFile().exists()) { + LOG.info("STOP request received."); + startedCollector.stop(); + } + }, 0, 500, TimeUnit.MILLISECONDS); } @Override @@ -94,10 +89,12 @@ public void destroy() throws CollectorStopperException { } @Override - public boolean fireStopRequest() throws CollectorStopperException { - final Path stopFile = stopFile(startedCollector); + public boolean fireStopRequest( + Collector shallowCollector) throws CollectorStopperException { - if (!startedCollector.isRunning()) { + final var stopFile = getStopFile(shallowCollector); + + if (!shallowCollector.isRunning()) { LOG.info("CANNOT STOP: The Collector is not running."); return false; } @@ -119,8 +116,18 @@ public boolean fireStopRequest() throws CollectorStopperException { private synchronized void stopMonitoring(Collector collector) throws CollectorStopperException { - monitoring = false; - Path stopFile = stopFile(collector); + LOG.debug("Shutting down stop monitor service..."); + if (execService != null) { + try { + shutdownRequested = true; + execService.shutdown(); + } finally { + execService = null; + shutdownRequested = false; + } + } + LOG.debug("Stop monitor service stopped"); + var stopFile = getStopFile(collector); try { Files.deleteIfExists(stopFile); } catch (IOException e) { @@ -128,7 +135,7 @@ private synchronized void stopMonitoring(Collector collector) "Cannot delete stop file: " + stopFile.toAbsolutePath(), e); } } - private static Path stopFile(Collector collector) { + private static Path getStopFile(Collector collector) { return collector.getWorkDir().resolve(".collector-stop"); } } diff --git a/src/test/java/com/norconex/collector/core/CollectorTest.java b/src/test/java/com/norconex/collector/core/CollectorTest.java index 984bb15..6bf94ff 100644 --- a/src/test/java/com/norconex/collector/core/CollectorTest.java +++ b/src/test/java/com/norconex/collector/core/CollectorTest.java @@ -24,7 +24,6 @@ import org.apache.commons.collections4.CollectionUtils; import org.junit.jupiter.api.Test; -import com.norconex.collector.core.crawler.CrawlerConfig; import com.norconex.collector.core.crawler.MockCrawlerConfig; import com.norconex.collector.core.filter.impl.ExtensionReferenceFilter; import com.norconex.committer.core3.fs.impl.JSONFileCommitter; @@ -40,16 +39,16 @@ public class CollectorTest { @Test public void testWriteRead() { - MockCollectorConfig config = new MockCollectorConfig(); + var config = new MockCollectorConfig(); config.setId("test-collector"); config.setMaxConcurrentCrawlers(100); config.setEventListeners(new MockCollectorEventListener()); - MockCrawlerConfig crawlerCfg = new MockCrawlerConfig(); + var crawlerCfg = new MockCrawlerConfig(); crawlerCfg.setId("myCrawler"); crawlerCfg.setCommitters(new JSONFileCommitter()); - config.setCrawlerConfigs(new CrawlerConfig[] {crawlerCfg}); + config.setCrawlerConfigs(crawlerCfg); XML.assertWriteRead(config, "collector"); } @@ -57,13 +56,13 @@ public void testWriteRead() { @SuppressWarnings("deprecation") @Test public void testOverwriteCrawlerDefaults() throws IOException { - MockCollectorConfig cfg = new MockCollectorConfig(); + var cfg = new MockCollectorConfig(); try (Reader r = new InputStreamReader(getClass().getResourceAsStream( "overwrite-crawlerDefaults.xml"))) { XML.of(r).create().populate(cfg); } - MockCrawlerConfig crawlA = + var crawlA = (MockCrawlerConfig) cfg.getCrawlerConfigs().get(0); assertEquals(22, crawlA.getNumThreads(), "crawlA"); @@ -81,7 +80,7 @@ public void testOverwriteCrawlerDefaults() throws IOException { crawlA.getCommitters().get(0)).getDirectory().toString(), "crawlA"); - MockCrawlerConfig crawlB = + var crawlB = (MockCrawlerConfig) cfg.getCrawlerConfigs().get(1); assertEquals(1, crawlB.getNumThreads(), "crawlB"); assertEquals("defaultFilter", ((ExtensionReferenceFilter) @@ -105,7 +104,7 @@ public void testOverwriteCrawlerDefaults() throws IOException { public void testValidation() throws IOException { try (Reader r = new InputStreamReader(getClass().getResourceAsStream( "/validation/collector-core-full.xml"))) { - ErrorHandlerCapturer eh = new ErrorHandlerCapturer(); + var eh = new ErrorHandlerCapturer(); XML.of(r).setErrorHandler(eh).create().populate( new MockCollectorConfig()); assertEquals(0, eh.getErrors().size(),

Default mappings
Crawl stateStrategy
NOT_FOUNDDELETE
BAD_STATUSGRACE_ONCE