Skip to content

Commit

Permalink
Keep the reload/reset event processing logic at the event processing …
Browse files Browse the repository at this point in the history
…level

* Prevent double event dispatching. Reload implies reset, avoid
  sending both events on reload().

* Revert changes to GeoServerImpl (overloading of resert() and reload()
  with a bolean `silent` argument). It doesn't prevent that once the
  Catalog and/or the GeoServer are reloaded, while the newly loaded
  objects are sync'ed to the in-memory one, CatalogInfoAdded and
  ConfigInfoAdded events are sent to all pods, and these ones in turn
  also do so, resulting in an explosion of events.
  Instead, disable event publishing completely while reload() and
  reset() are being processed:

  * `CatalogApplicationEventPublisher` gets new methods:
    `disable()`, `enable()`, and `enabled():boolean`.
  * `GeoServerLifecycleEventPublisher.beforeReload()` sends the
    `ReloadEvent` and disables event publishing while `reload()` is
    processed locally.
  * `GeoServerLifecycleEventPublisher.reloaded()` re-enables event
    publishing.
  • Loading branch information
groldan committed Nov 29, 2024
1 parent 0732254 commit d4efdb7
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 130 deletions.
2 changes: 1 addition & 1 deletion compose/.env
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ GEOSERVER_BASE_PATH=/geoserver/cloud
# logging profile, either "default" or "json-logs"
#LOGGING_PROFILE=json-logs
LOGGING_PROFILE=default
GEOSERVER_DEFAULT_PROFILES="${LOGGING_PROFILE},acl"
GEOSERVER_DEFAULT_PROFILES="${LOGGING_PROFILE},acl,logging_debug_events"

GATEWAY_DEFAULT_PROFILES=${LOGGING_PROFILE}
DISCOVERY_SERVER_DEFAULT_PROFILES=${LOGGING_PROFILE}
Expand Down
2 changes: 1 addition & 1 deletion config
Submodule config updated 1 files
+1 −0 geoserver_logging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@

import lombok.extern.slf4j.Slf4j;

import org.geoserver.cloud.config.catalog.events.CatalogApplicationEventPublisher;
import org.geoserver.cloud.event.GeoServerEvent;
import org.geoserver.cloud.event.lifecycle.ReloadEvent;
import org.geoserver.cloud.event.lifecycle.ResetEvent;
import org.geoserver.config.plugin.GeoServerImpl;
import org.springframework.context.event.EventListener;

/**
* Listens for and processes {@link ResetEvent} and {@link ReloadEvent} events.
* Listens for and processes {@link GeoServerEvent#isRemote() remote} {@link ResetEvent} and {@link
* ReloadEvent} events.
*
* @since 1.0
*/
@Slf4j(topic = "org.geoserver.cloud.event.remote.lifecycle")
@Slf4j
public class LifecycleEventProcessor {

private final GeoServerImpl rawGeoServer;
Expand All @@ -32,21 +35,35 @@ public LifecycleEventProcessor(GeoServerImpl rawGeoServer) {
public void onReset(ResetEvent event) {

if (event.isRemote()) {
log.debug("Received a remote ResetEvent, triggering a GeoServer reset ({})", event);
rawGeoServer.reset(true);
log.debug("Disabling event publishing while processing {}", event);
CatalogApplicationEventPublisher.disable();
try {
rawGeoServer.reset();
log.debug("Reenabling event publishing after {}", event);
} finally {
CatalogApplicationEventPublisher.enable();
}
} else {
log.debug("Ignoring local {}", event);
}
}

@EventListener(ReloadEvent.class)
public void onReload(ReloadEvent event) {

if (event.isRemote()) {
log.debug("Received a remote ReloadEvent, triggering a GeoServer reload ({})", event);
log.debug("Disabling event publishing while processing {}", event);
CatalogApplicationEventPublisher.disable();
try {
rawGeoServer.reload(null, true);
rawGeoServer.reload();
log.debug("Reenabling event publishing after {}", event);
} catch (Exception e) {
log.error("Error reloading catalog: ", e);
} finally {
CatalogApplicationEventPublisher.enable();
}
} else {
log.debug("Ignoring local {}", event);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ void onRemoteUpdateSequenceEvent(UpdateSequenceEvent event) {
}
final long updateSequence = event.getUpdateSequence();
GeoServerInfo info = ModificationProxy.unwrap(configFacade.getGlobal());
if (null == info) return;
final long current = info.getUpdateSequence();
if (updateSequence > current) {
info.setUpdateSequence(updateSequence);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public <T extends LifecycleEvent> RemoteGeoServerEvent expectOneLifecycleEvent(
return matches.stream().findFirst().get();
}

public <T extends InfoEvent> List<RemoteGeoServerEvent> allOf(
public <T extends GeoServerEvent> List<RemoteGeoServerEvent> allOf(
Class<T> payloadEventType, Predicate<T> eventFilter) {

return capturedEvents(payloadEventType)
Expand All @@ -132,15 +132,15 @@ public <T extends LifecycleEvent> List<RemoteGeoServerEvent> allOfLifecycle(
.toList();
}

public <T extends InfoEvent> List<RemoteGeoServerEvent> allOf(Class<T> payloadType) {
public <T extends GeoServerEvent> List<RemoteGeoServerEvent> allOf(Class<T> payloadType) {
return capturedEvents(payloadType).toList();
}

public <T extends InfoEvent> Optional<RemoteGeoServerEvent> first(Class<T> payloadType) {
public <T extends GeoServerEvent> Optional<RemoteGeoServerEvent> first(Class<T> payloadType) {
return capturedEvents(payloadType).findFirst();
}

private <T extends InfoEvent> Stream<RemoteGeoServerEvent> capturedEvents(
private <T extends GeoServerEvent> Stream<RemoteGeoServerEvent> capturedEvents(
Class<T> payloadType) {
return capturedEvents().filter(remote -> payloadType.isInstance(remote.getEvent()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/
package org.geoserver.cloud.event.bus;

import static org.assertj.core.api.Assertions.assertThat;

import org.geoserver.cloud.event.lifecycle.LifecycleEvent;
import org.geoserver.cloud.event.lifecycle.ReloadEvent;
import org.geoserver.cloud.event.lifecycle.ResetEvent;
Expand Down Expand Up @@ -49,10 +51,13 @@ void testGeoServerHasExecutedReload() {
};
modifier.accept(geoserver);

// reload also triggers reset!
eventsCaptor.local().expectOneLifecycleEvent(ReloadEvent.class);
eventsCaptor.local().expectOneLifecycleEvent(ResetEvent.class);
eventsCaptor.remote().expectOneLifecycleEvent(ReloadEvent.class);
eventsCaptor.remote().expectOneLifecycleEvent(ResetEvent.class);

// reload implies reset, so shall not trigger a reset event
assertThat(eventsCaptor.local().allOf(ResetEvent.class)).isEmpty();
;
assertThat(eventsCaptor.remote().allOf(ResetEvent.class)).isEmpty();
;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;

Expand All @@ -63,7 +64,7 @@
* @see InfoEvent LocalInfoEvent's class hierarchy
*/
@RequiredArgsConstructor
class CatalogApplicationEventPublisher {
public class CatalogApplicationEventPublisher {

private final @NonNull Consumer<? super InfoEvent> eventPublisher;
private final @NonNull Catalog catalog;
Expand All @@ -73,6 +74,8 @@ class CatalogApplicationEventPublisher {
private LocalCatalogEventPublisher publishingCatalogListener;
private LocalConfigEventPublisher publishingConfigListener;

private static final AtomicBoolean ENABLED = new AtomicBoolean(true);

public @PostConstruct void initialize() {
publishingCatalogListener = new LocalCatalogEventPublisher(this);
publishingConfigListener = new LocalConfigEventPublisher(this);
Expand All @@ -81,13 +84,31 @@ class CatalogApplicationEventPublisher {
geoServer.addListener(publishingConfigListener);
}

/**
* Disables event publishing. Make sure to call enable() once done forcing not to publish events
*/
public static void disable() {
ENABLED.set(false);
}

/** Re-enables event publishing */
public static void enable() {
ENABLED.set(true);
}

public static boolean enabled() {
return ENABLED.get();
}

void publish(@NonNull InfoEvent event) {
eventPublisher.accept(event);
if (enabled()) {
eventPublisher.accept(event);
}
}

@NonNull
Long incrementSequence() {
return this.updateSequenceIncrementor.get();
return enabled() ? this.updateSequenceIncrementor.get() : -1L;
}

@RequiredArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,38 @@ class GeoServerLifecycleEventPublisher implements GeoServerLifecycleHandler {
private final @NonNull Consumer<? super LifecycleEvent> eventPublisher;

void publish(@NonNull LifecycleEvent event) {
log.debug("Publishing {}", event);
eventPublisher.accept(event);
}

@Override
public void onReset() {
log.debug("Publishing the onReset event");

publish(new ResetEvent());
if (CatalogApplicationEventPublisher.enabled()) {
publish(new ResetEvent());
}
}

@Override
public void onDispose() {
log.debug("Ignoring the onDispose event");
public void beforeReload() {
if (CatalogApplicationEventPublisher.enabled()) {
// Thus, we want to inform all connected services as early as possible
// to activate reloading in parallel.
publish(new ReloadEvent());
log.debug("Disabling event publishing during reload()");
CatalogApplicationEventPublisher.disable();
}
}

@Override
public void beforeReload() {
// Thus, we want to inform all connected services as early as possible
// to activate reloading in parallel.
log.debug("Publishing the beforeReload event");

publish(new ReloadEvent());
public void onReload() {
if (!CatalogApplicationEventPublisher.enabled()) {
log.debug("Reenabling event publishing after reload()");
CatalogApplicationEventPublisher.enable();
}
}

@Override
public void onReload() {
log.debug("Ignoring the onReload event");
public void onDispose() {
// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
@JsonSubTypes.Type(value = ReloadEvent.class),
@JsonSubTypes.Type(value = ResetEvent.class)
})
@SuppressWarnings("serial")
public abstract class LifecycleEvent extends GeoServerEvent {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT)
@JsonTypeName("ReloadEvent")
@SuppressWarnings("serial")
public class ReloadEvent extends LifecycleEvent {
public ReloadEvent() {
// no-op, for serialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT)
@JsonTypeName("ResetEvent")
@SuppressWarnings("serial")
public class ResetEvent extends LifecycleEvent {
public ResetEvent() {
// no-op, for serialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,11 @@ void testGSLifeCycleDispatchOnReload() throws Exception {

// Check that there is no other event being triggered, we expect two ones.
List<LifecycleEvent> allEvents = listener.allOf(LifecycleEvent.class);
assertEquals(2, allEvents.size());
assertEquals(1, allEvents.size());

// And we expect them to be a (local) ResetEvent and a (local) ReloadEvent.
ReloadEvent reloadEvent = listener.expectOne(ReloadEvent.class);
assertTrue(reloadEvent.isLocal());
ResetEvent resetEvent = listener.expectOne(ResetEvent.class);
assertTrue(resetEvent.isLocal());
}

@Test
Expand Down
Loading

0 comments on commit d4efdb7

Please sign in to comment.