Skip to content

Commit

Permalink
[fix][broker] Avoid bundle unload destination broker be set as an ina…
Browse files Browse the repository at this point in the history
…ctive broker. (apache#19244)

Co-authored-by: nicklixinyang <[email protected]>
  • Loading branch information
Nicklee007 and nicklixinyang authored Jan 17, 2023
1 parent b4d5857 commit 299bd70
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
Expand Down Expand Up @@ -890,13 +891,34 @@ public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("destinationBroker") String destinationBroker) {
validateNamespaceName(property, cluster, namespace);
setNamespaceBundleAffinity(bundleRange, destinationBroker);
internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
.thenAccept(__ -> {
log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), bundleRange);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
pulsar().getLoadManager().get().getAvailableBrokersAsync()
.thenApply(brokers ->
StringUtils.isNotBlank(destinationBroker) ? brokers.contains(destinationBroker) : true)
.thenAccept(isActiveDestination -> {
if (isActiveDestination) {
setNamespaceBundleAffinity(bundleRange, destinationBroker);
internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
.thenAccept(__ -> {
log.info("[{}] Successfully unloaded namespace bundle {}",
clientAppId(), bundleRange);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to unload namespace bundle {}/{}",
clientAppId(), namespaceName, bundleRange, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
} else {
log.warn("[{}] Failed to unload namespace bundle {}/{} to inactive broker {}.",
clientAppId(), namespaceName, bundleRange, destinationBroker);
resumeAsyncResponseExceptionally(asyncResponse,
new BrokerServiceException.NotAllowedException(
"Not allowed unload namespace bundle to inactive destination broker"));
}
}).exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to unload namespace bundle {}/{}",
clientAppId(), namespaceName, bundleRange, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.admin.impl.OffloaderObjectsScannerUtils;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
Expand Down Expand Up @@ -815,13 +817,34 @@ public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("destinationBroker") String destinationBroker) {
validateNamespaceName(tenant, namespace);
setNamespaceBundleAffinity(bundleRange, destinationBroker);
internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
.thenAccept(__ -> {
log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), bundleRange);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
pulsar().getLoadManager().get().getAvailableBrokersAsync()
.thenApply(brokers ->
StringUtils.isNotBlank(destinationBroker) ? brokers.contains(destinationBroker) : true)
.thenAccept(isActiveDestination -> {
if (isActiveDestination) {
setNamespaceBundleAffinity(bundleRange, destinationBroker);
internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
.thenAccept(__ -> {
log.info("[{}] Successfully unloaded namespace bundle {}",
clientAppId(), bundleRange);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to unload namespace bundle {}/{}",
clientAppId(), namespaceName, bundleRange, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
} else {
log.warn("[{}] Failed to unload namespace bundle {}/{} to inactive broker {}.",
clientAppId(), namespaceName, bundleRange, destinationBroker);
resumeAsyncResponseExceptionally(asyncResponse,
new BrokerServiceException.NotAllowedException(
"Not allowed unload namespace bundle to inactive destination broker"));
}
}).exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to unload namespace bundle {}/{}",
clientAppId(), namespaceName, bundleRange, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,13 @@ public void testNamespacesApiRedirects() throws Exception {
assertEquals(captor.getValue().getResponse().getLocation().toString(),
UriBuilder.fromUri(uri).host("broker-usc.com").port(8080).toString());

// check the bundle should not unload to an inactive destination broker
namespaces.unloadNamespaceBundle(response, this.testTenant, this.testOtherCluster,
this.testLocalNamespaces.get(2).getLocalName(), "0x00000000_0xffffffff", false, "inactive_destination:8080");
captor = ArgumentCaptor.forClass(WebApplicationException.class);
verify(response, timeout(5000).atLeast(1)).resume(captor.capture());
assertEquals(captor.getValue().getResponse().getStatus(), Status.CONFLICT.getStatusCode());

uri = URI.create(pulsar.getWebServiceAddress() + "/admin/namespace/"
+ this.testGlobalNamespaces.get(0).toString() + "/configversion");
doReturn(uri).when(uriInfo).getRequestUri();
Expand Down

0 comments on commit 299bd70

Please sign in to comment.