From 697d3008faba5414d0450c72f08778c1603b28b3 Mon Sep 17 00:00:00 2001 From: Peter Palaga Date: Wed, 3 Jan 2024 00:56:46 +0100 Subject: [PATCH] DevUI: Split the client and service lists into separate cards, add links to their WSDLs #1015 --- .../deployment/CXFServletInfosBuildItem.java | 22 ++ .../cxf/deployment/CxfClientProcessor.java | 73 +++-- .../CxfEndpointImplementationProcessor.java | 4 +- .../cxf/deployment/devui/DevUIProcessor.java | 72 ++++- .../main/resources/dev-ui/qwc-cxf-clients.js | 114 +++++++ .../main/resources/dev-ui/qwc-cxf-services.js | 167 ++++------ .../test/dev/DevUiRemoteStatsImpl.java | 36 +++ .../cxf/deployment/test/dev/DevUiStats.java | 21 ++ .../deployment/test/dev/DevUiStatsImpl.java | 34 ++ .../cxf/deployment/test/dev/DevUiTest.java | 83 +++++ .../java/io/quarkiverse/cxf/CXFRecorder.java | 3 - .../quarkiverse/cxf/ClientInjectionPoint.java | 39 +++ .../io/quarkiverse/cxf/CxfClientProducer.java | 38 ++- .../cxf/devui/CxfJsonRPCService.java | 76 ++++- .../cxf/devui/DevUiClientInfo.java | 39 +++ .../quarkiverse/cxf/devui/DevUiRecorder.java | 24 ++ .../cxf/devui/DevUiServiceInfo.java | 33 ++ .../cxf/transport/generated/AppendBuffer.java | 202 ++++++++++++ .../generated/VertxServletOutputStream.java | 294 ++++++++++++++++++ 19 files changed, 1205 insertions(+), 169 deletions(-) create mode 100644 extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CXFServletInfosBuildItem.java create mode 100644 extensions/core/deployment/src/main/resources/dev-ui/qwc-cxf-clients.js create mode 100644 extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/dev/DevUiRemoteStatsImpl.java create mode 100644 extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/dev/DevUiStats.java create mode 100644 extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/dev/DevUiStatsImpl.java create mode 100644 extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/dev/DevUiTest.java create mode 100644 extensions/core/runtime/src/main/java/io/quarkiverse/cxf/ClientInjectionPoint.java create mode 100644 extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/DevUiClientInfo.java create mode 100644 extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/DevUiRecorder.java create mode 100644 extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/DevUiServiceInfo.java create mode 100644 extensions/src/main/java/io/quarkiverse/cxf/transport/generated/AppendBuffer.java create mode 100644 extensions/src/main/java/io/quarkiverse/cxf/transport/generated/VertxServletOutputStream.java diff --git a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CXFServletInfosBuildItem.java b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CXFServletInfosBuildItem.java new file mode 100644 index 000000000..72494cde7 --- /dev/null +++ b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CXFServletInfosBuildItem.java @@ -0,0 +1,22 @@ +package io.quarkiverse.cxf.deployment; + +import io.quarkiverse.cxf.CXFServletInfos; +import io.quarkus.builder.item.SimpleBuildItem; +import io.quarkus.runtime.RuntimeValue; + +/** + * Holds the runtime {@link CXFServletInfos} reference. + */ +public final class CXFServletInfosBuildItem extends SimpleBuildItem { + private final RuntimeValue cxfServletInfos; + + public CXFServletInfosBuildItem(RuntimeValue cxfServletInfos) { + super(); + this.cxfServletInfos = cxfServletInfos; + } + + public RuntimeValue getCxfServletInfos() { + return cxfServletInfos; + } + +} \ No newline at end of file diff --git a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfClientProcessor.java b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfClientProcessor.java index 0b09d00b2..502e8267e 100644 --- a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfClientProcessor.java +++ b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfClientProcessor.java @@ -11,6 +11,7 @@ import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import java.util.stream.Stream; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.inject.Disposes; @@ -35,6 +36,7 @@ import io.quarkiverse.cxf.CXFClientData; import io.quarkiverse.cxf.CXFClientInfo; import io.quarkiverse.cxf.CXFRecorder; +import io.quarkiverse.cxf.ClientInjectionPoint; import io.quarkiverse.cxf.CxfClientConfig.HTTPConduitImpl; import io.quarkiverse.cxf.CxfClientProducer; import io.quarkiverse.cxf.CxfFixedConfig; @@ -178,34 +180,53 @@ private static AnnotationInstance findWebServiceClientAnnotation(IndexView index return null; } + public static Stream findClientInjectionPoints(IndexView index) { + return index.getAnnotations(CxfDotNames.CXFCLIENT_ANNOTATION).stream() + .map(annotationInstance -> { + final AnnotationTarget target = annotationInstance.target(); + Type type; + switch (target.kind()) { + case FIELD: + type = target.asField().type(); + break; + case METHOD_PARAMETER: + MethodParameterInfo paramInfo = target.asMethodParameter(); + MethodInfo method = paramInfo.method(); + type = method.parameterTypes().get(paramInfo.position()); + break; + default: + type = null; + break; + } + if (type != null) { + type = type.name().equals(CxfDotNames.INJECT_INSTANCE) ? type.asParameterizedType().arguments().get(0) + : type; + final String typeName = type.name().toString(); + try { + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + final Class sei = Class.forName(typeName, true, cl); + final AnnotationValue value = annotationInstance.value(); + return new ClientInjectionPoint(value != null ? value.asString() : "", sei); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Could not load Service Endpoint Interface " + typeName); + } + } else { + return null; + } + }) + .filter(ip -> ip != null) + .distinct(); + } + private static Map findClientSEIsInUse(IndexView index, CxfFixedConfig config) { final Map seiToClientConfig = new TreeMap<>(); - index.getAnnotations(CxfDotNames.CXFCLIENT_ANNOTATION).forEach(annotationInstance -> { - final AnnotationTarget target = annotationInstance.target(); - Type type; - switch (target.kind()) { - case FIELD: - type = target.asField().type(); - break; - case METHOD_PARAMETER: - MethodParameterInfo paramInfo = target.asMethodParameter(); - MethodInfo method = paramInfo.method(); - type = method.parameterTypes().get(paramInfo.position()); - break; - default: - type = null; - break; - } - if (type != null) { - type = type.name().equals(CxfDotNames.INJECT_INSTANCE) ? type.asParameterizedType().arguments().get(0) - : type; - final String typeName = type.name().toString(); - final ClientFixedConfig clientConfig = findClientConfig( - config, - Optional.ofNullable(annotationInstance.value()).map(AnnotationValue::asString).orElse(null), - typeName); - seiToClientConfig.put(typeName, clientConfig); - } + findClientInjectionPoints(index).forEach(clientInjectionPoint -> { + String sei = clientInjectionPoint.getSei().getName(); + final ClientFixedConfig clientConfig = findClientConfig( + config, + clientInjectionPoint.getConfigKey(), + sei); + seiToClientConfig.put(sei, clientConfig); }); return seiToClientConfig; } diff --git a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfEndpointImplementationProcessor.java b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfEndpointImplementationProcessor.java index a7bce2075..d6e59cdae 100644 --- a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfEndpointImplementationProcessor.java +++ b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/CxfEndpointImplementationProcessor.java @@ -116,7 +116,7 @@ void collectEndpoints( @BuildStep @Record(ExecutionTime.RUNTIME_INIT) - void startRoute(CXFRecorder recorder, + CXFServletInfosBuildItem startRoute(CXFRecorder recorder, BuildProducer routes, BeanContainerBuildItem beanContainer, List cxfEndpoints, @@ -154,6 +154,8 @@ void startRoute(CXFRecorder recorder, LOGGER.debug( "Not registering a Vert.x handler for CXF as no WS endpoints were found at build time and no other extension requested it"); } + + return new CXFServletInfosBuildItem(infos); } private static String getMappingPath(String path) { diff --git a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/devui/DevUIProcessor.java b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/devui/DevUIProcessor.java index 94f9e85f4..bfe99f276 100644 --- a/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/devui/DevUIProcessor.java +++ b/extensions/core/deployment/src/main/java/io/quarkiverse/cxf/deployment/devui/DevUIProcessor.java @@ -1,12 +1,27 @@ package io.quarkiverse.cxf.deployment.devui; +import java.util.Comparator; import java.util.List; +import java.util.stream.Collectors; -import io.quarkiverse.cxf.deployment.CxfClientBuildItem; -import io.quarkiverse.cxf.deployment.CxfEndpointImplementationBuildItem; +import org.jboss.jandex.DotName; +import org.jboss.jandex.ParameterizedType; +import org.jboss.jandex.Type; +import org.jboss.jandex.Type.Kind; + +import io.quarkiverse.cxf.ClientInjectionPoint; +import io.quarkiverse.cxf.deployment.CXFServletInfosBuildItem; +import io.quarkiverse.cxf.deployment.CxfClientProcessor; import io.quarkiverse.cxf.devui.CxfJsonRPCService; +import io.quarkiverse.cxf.devui.DevUiRecorder; +import io.quarkus.arc.deployment.SyntheticBeanBuildItem; import io.quarkus.deployment.IsDevelopment; +import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.annotations.ExecutionTime; +import io.quarkus.deployment.annotations.Record; +import io.quarkus.deployment.builditem.CombinedIndexBuildItem; +import io.quarkus.deployment.builditem.ShutdownContextBuildItem; import io.quarkus.devui.spi.JsonRPCProvidersBuildItem; import io.quarkus.devui.spi.page.CardPageBuildItem; import io.quarkus.devui.spi.page.Page; @@ -14,18 +29,21 @@ public class DevUIProcessor { @BuildStep(onlyIf = IsDevelopment.class) - public CardPageBuildItem pages(List services, - List clients) { + public CardPageBuildItem pages() { CardPageBuildItem cardPageBuildItem = new CardPageBuildItem(); - int total = services.size() + clients.size(); + cardPageBuildItem.addPage(Page.webComponentPageBuilder() + .title("Clients") + .icon("font-awesome-solid:message") + .componentLink("qwc-cxf-clients.js") + .dynamicLabelJsonRPCMethodName("getClientCount")); cardPageBuildItem.addPage(Page.webComponentPageBuilder() - .title("List of SOAP WS") - .icon("font-awesome-solid:cubes") + .title("Service Endpoints") + .icon("font-awesome-solid:gears") .componentLink("qwc-cxf-services.js") - .staticLabel(String.valueOf(total))); + .dynamicLabelJsonRPCMethodName("getServiceCount")); return cardPageBuildItem; } @@ -34,4 +52,42 @@ public CardPageBuildItem pages(List services JsonRPCProvidersBuildItem createJsonRPCServiceForCache() { return new JsonRPCProvidersBuildItem(CxfJsonRPCService.class); } + + @BuildStep(onlyIf = IsDevelopment.class) + @Record(ExecutionTime.STATIC_INIT) + void collectClients( + CombinedIndexBuildItem combinedIndexBuildItem, + BuildProducer synthetics, + DevUiRecorder recorder) { + final List injectionPoints = CxfClientProcessor.findClientInjectionPoints( + combinedIndexBuildItem.getIndex()) + .sorted(Comparator.comparing(ClientInjectionPoint::getConfigKey).thenComparing(cip -> cip.getSei().getName())) + .collect(Collectors.toList()); + synthetics.produce(SyntheticBeanBuildItem + .configure(List.class) + .types( + ParameterizedType.create( + DotName.createSimple(List.class.getName()), + Type.create(DotName.createSimple(ClientInjectionPoint.class.getName()), Kind.CLASS))) + .named("clientInjectionPoints") + .runtimeValue(recorder.clientInjectionPoints(injectionPoints)) + .done()); + } + + @BuildStep(onlyIf = IsDevelopment.class) + @Record(ExecutionTime.RUNTIME_INIT) + void collectServices( + CXFServletInfosBuildItem infos, + DevUiRecorder recorder) { + recorder.servletInfos(infos.getCxfServletInfos()); + } + + @BuildStep + @Record(ExecutionTime.RUNTIME_INIT) + void shutDown( + DevUiRecorder recorder, + ShutdownContextBuildItem shutdownContext) { + recorder.shutdown(shutdownContext); + } + } diff --git a/extensions/core/deployment/src/main/resources/dev-ui/qwc-cxf-clients.js b/extensions/core/deployment/src/main/resources/dev-ui/qwc-cxf-clients.js new file mode 100644 index 000000000..3c3958366 --- /dev/null +++ b/extensions/core/deployment/src/main/resources/dev-ui/qwc-cxf-clients.js @@ -0,0 +1,114 @@ +import { LitElement, html, css} from 'lit'; +import 'qui-card'; +import '@vaadin/progress-bar'; +import '@vaadin/grid'; +import { columnBodyRenderer } from '@vaadin/grid/lit.js'; +import { JsonRpc } from 'jsonrpc'; +import 'qui-ide-link'; + +/** + * This component shows the list of clients + */ +export class QwcCxfClients extends LitElement { + jsonRpc = new JsonRpc(this); + + static styles = css` + .cxf-table { + height: 100%; + padding-bottom: 10px; + } + + code { + font-size: 85%; + } + + .annotation { + color: var(--lumo-contrast-50pct); + } + + :host { + display: flex; + flex-direction:column; + gap: 20px; + padding-left: 10px; + padding-right: 10px; + } + .nothing-found { + padding: 5px; + }`; + + static properties = { + _clients: {state: true} + }; + + constructor() { + super(); + this._clients = null; + } + + connectedCallback() { + super.connectedCallback(); + this.jsonRpc.getClients().then(jsonRpcResponse => { + this._clients = jsonRpcResponse.result; + }); + } + + render() { + if (this._clients) { + if (this._clients.length > 0) { + return this._renderClientList(); + } else { + return html`
No clients found
`; + } + } else { + return html``; + } + } + + _renderClientList(){ + return html` + + + + + + + + + `; + } + + _classNameRenderer(client){ + return html` + @CXFClient("${client.configKey}") + ${client.sei} + `; + } + + _addressRenderer(client) { + return html` +   + ${client.address} + `; + } + + _wsdlRenderer(client) { + return html` +   + ${client.wsdl} + `; + } + +} +customElements.define('qwc-cxf-clients', QwcCxfClients); \ No newline at end of file diff --git a/extensions/core/deployment/src/main/resources/dev-ui/qwc-cxf-services.js b/extensions/core/deployment/src/main/resources/dev-ui/qwc-cxf-services.js index 5bd586877..ff2819c3b 100644 --- a/extensions/core/deployment/src/main/resources/dev-ui/qwc-cxf-services.js +++ b/extensions/core/deployment/src/main/resources/dev-ui/qwc-cxf-services.js @@ -1,18 +1,31 @@ -import { LitElement, html, css} from 'lit'; +import { LitElement, html, css} from 'lit'; import 'qui-card'; import '@vaadin/progress-bar'; -import '@vaadin/grid'; +import '@vaadin/grid'; import { columnBodyRenderer } from '@vaadin/grid/lit.js'; import { JsonRpc } from 'jsonrpc'; import 'qui-ide-link'; /** - * This component shows the List SOAP Web Services and clients + * This component shows the list of Service endpoints */ -export class QwcCxfServices extends LitElement { +export class QwcCxfServices extends LitElement { jsonRpc = new JsonRpc(this); - - static styles = css` + + static styles = css` + .cxf-table { + height: 100%; + padding-bottom: 10px; + } + + code { + font-size: 85%; + } + + .service-sei { + color: var(--lumo-contrast-50pct); + } + :host { display: flex; flex-direction:column; @@ -23,125 +36,67 @@ export class QwcCxfServices extends LitElement { .nothing-found { padding: 5px; }`; - + static properties = { - _services: {state: true}, - _clients: {state: true} + _services: {state: true} }; - - constructor() { + + constructor() { super(); this._services = null; - this._clients = null; } connectedCallback() { super.connectedCallback(); - this.jsonRpc.getServices().then(jsonRpcResponse => { + this.jsonRpc.getServices().then(jsonRpcResponse => { this._services = jsonRpcResponse.result; }); - this.jsonRpc.getClients().then(jsonRpcResponse => { - this._clients = jsonRpcResponse.result; - }); } - render() { - return html`${this._renderSoapServiceCard()} - ${this._renderSoapClientsCard()}`; - } - - _renderSoapServiceCard(){ - return html` -
- ${this._renderSoapServices()} -
-
`; - } - - _renderSoapServices(){ - if(this._services){ - if(this._services.length>0) { - return html` - - - - - - `; + render() { + if (this._services) { + if (this._services.length > 0) { + return this._renderServiceList(); }else { - return html`
No SOAP Services found
`; + return html`
No service endpoints found
`; } - }else{ + } else { return html``; } } - - _classNameRenderer(service){ - return html`${service.className}`; - } - - _pathRenderer(service) { - return html`${service.path}${service.relativePath}`; - } - - _renderSoapClientsCard(){ - return html` -
- ${this._renderSoapClients()} -
-
`; - } - - _renderSoapClients(){ - if(this._clients){ - if(this._clients.length>0) { - - return html` - - - - - `; - }else { - return html`
No SOAP Clients found
`; - } - }else{ - return html``; - } + _renderServiceList(){ + return html` + + + + + + `; } - - _seiRenderer(client){ - return html`${client.sei}`; + + _classNameRenderer(service){ + /* service.sei always the same as service.className + ${service.sei} + */ + return html` + ${service.implementor} + `; } - - _endpointAddressRenderer(client){ - if(client.wsdlUrl){ - return html`${client.wsdlUrl}`; - }else if(client.endpointAddress){ - return html`${client.endpointAddress}`; - }else { - return html`N/A`; - } + + _wsdlRenderer(service) { + return html`${service.path}?wsdl`; } - + } -customElements.define('qwc-cxf-services', QwcCxfServices); \ No newline at end of file +customElements.define('qwc-cxf-services', QwcCxfServices); diff --git a/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/dev/DevUiRemoteStatsImpl.java b/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/dev/DevUiRemoteStatsImpl.java new file mode 100644 index 000000000..71f3c6a65 --- /dev/null +++ b/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/dev/DevUiRemoteStatsImpl.java @@ -0,0 +1,36 @@ +package io.quarkiverse.cxf.deployment.test.dev; + +import jakarta.jws.WebService; + +import io.quarkiverse.cxf.annotation.CXFClient; + +@WebService(name = "DevUiStats", serviceName = "DevUiStats") +public class DevUiRemoteStatsImpl implements DevUiStats { + + @CXFClient("stats") + DevUiStats stats; + + @CXFClient + DevUiStats keyLessStats; + + @Override + public int getClientCount() { + return stats.getClientCount(); + } + + @Override + public String getClient(int index) { + return stats.getClient(index); + } + + @Override + public int getServiceCount() { + return stats.getServiceCount(); + } + + @Override + public String getService(int index) { + return stats.getService(index); + } + +} diff --git a/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/dev/DevUiStats.java b/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/dev/DevUiStats.java new file mode 100644 index 000000000..f22be07e5 --- /dev/null +++ b/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/dev/DevUiStats.java @@ -0,0 +1,21 @@ +package io.quarkiverse.cxf.deployment.test.dev; + +import jakarta.jws.WebMethod; +import jakarta.jws.WebService; + +@WebService(serviceName = "DevUiStats") +public interface DevUiStats { + + @WebMethod + int getClientCount(); + + @WebMethod + String getClient(int index); + + @WebMethod + int getServiceCount(); + + @WebMethod + String getService(int index); + +} \ No newline at end of file diff --git a/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/dev/DevUiStatsImpl.java b/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/dev/DevUiStatsImpl.java new file mode 100644 index 000000000..6bf543a9e --- /dev/null +++ b/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/dev/DevUiStatsImpl.java @@ -0,0 +1,34 @@ +package io.quarkiverse.cxf.deployment.test.dev; + +import jakarta.inject.Inject; +import jakarta.jws.WebService; + +import io.quarkiverse.cxf.devui.CxfJsonRPCService; + +@WebService(name = "DevUiStats", serviceName = "DevUiStats") +public class DevUiStatsImpl implements DevUiStats { + + @Inject + CxfJsonRPCService rpcService; + + @Override + public int getClientCount() { + return rpcService.getClientCount(); + } + + @Override + public String getClient(int index) { + return rpcService.getClients().get(index).toString(); + } + + @Override + public int getServiceCount() { + return rpcService.getServiceCount(); + } + + @Override + public String getService(int index) { + return rpcService.getServices().get(index).toString(); + } + +} diff --git a/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/dev/DevUiTest.java b/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/dev/DevUiTest.java new file mode 100644 index 000000000..6402ca558 --- /dev/null +++ b/extensions/core/deployment/src/test/java/io/quarkiverse/cxf/deployment/test/dev/DevUiTest.java @@ -0,0 +1,83 @@ +package io.quarkiverse.cxf.deployment.test.dev; + +import java.io.IOException; +import java.io.StringWriter; +import java.io.Writer; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Properties; + +import javax.xml.namespace.QName; + +import jakarta.xml.ws.Service; + +import org.assertj.core.api.Assertions; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.Asset; +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkiverse.cxf.test.QuarkusCxfClientTestUtil; +import io.quarkus.test.QuarkusDevModeTest; + +public class DevUiTest { + + @RegisterExtension + static final QuarkusDevModeTest TEST = new QuarkusDevModeTest() + .setArchiveProducer(() -> ShrinkWrap + .create(JavaArchive.class) + .addClasses( + DevUiStats.class, + DevUiStatsImpl.class, + DevUiRemoteStatsImpl.class) + .addAsResource(applicationProperties(), "application.properties")); + + @Test + void clientsAndServices() throws InterruptedException { + final DevUiStats client = getClient(DevUiStats.class, "/services/stats"); + Assertions.assertThat(client.getClientCount()).isEqualTo(2); + Assertions.assertThat(client.getServiceCount()).isEqualTo(2); + + int i = 0; + Assertions.assertThat(client.getClient(i++)).isEqualTo( + "DevUiClientInfo [configKey=, sei=io.quarkiverse.cxf.deployment.test.dev.DevUiStats, address=http://localhost:8080/services/stats, wsdl=null]"); + Assertions.assertThat(client.getClient(i++)).isEqualTo( + "DevUiClientInfo [configKey=stats, sei=io.quarkiverse.cxf.deployment.test.dev.DevUiStats, address=http://localhost:8080/services/stats, wsdl=null]"); + + i = 0; + Assertions.assertThat(client.getService(i++)).isEqualTo( + "DevUiServiceInfo [path=/services/remote-stats, implementor=io.quarkiverse.cxf.deployment.test.dev.DevUiRemoteStatsImpl]"); + Assertions.assertThat(client.getService(i++)).isEqualTo( + "DevUiServiceInfo [path=/services/stats, implementor=io.quarkiverse.cxf.deployment.test.dev.DevUiStatsImpl]"); + } + + public static Asset applicationProperties() { + Writer writer = new StringWriter(); + Properties props = new Properties(); + props.setProperty("quarkus.cxf.endpoint.\"/stats\".implementor", DevUiStatsImpl.class.getName()); + props.setProperty("quarkus.cxf.endpoint.\"/remote-stats\".implementor", DevUiRemoteStatsImpl.class.getName()); + props.setProperty("quarkus.cxf.client.stats.client-endpoint-url", "http://localhost:8080/services/stats"); + props.setProperty("quarkus.cxf.client.stats.service-interface", DevUiStats.class.getName()); + try { + props.store(writer, ""); + } catch (IOException e) { + throw new RuntimeException(e); + } + return new StringAsset(writer.toString()); + } + + public static T getClient(Class serviceInterface, String path) { + try { + final String namespace = QuarkusCxfClientTestUtil.getDefaultNameSpace(serviceInterface); + final URL serviceUrl = new URL("http://localhost:8080" + path + "?wsdl"); + final QName qName = new QName(namespace, serviceInterface.getSimpleName()); + final Service service = Service.create(serviceUrl, qName); + return service.getPort(serviceInterface); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java index 2aa4ef59f..a52c7e235 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CXFRecorder.java @@ -10,7 +10,6 @@ import org.apache.cxf.transport.http.HTTPConduitFactory; import org.jboss.logging.Logger; -import io.quarkiverse.cxf.devui.CxfJsonRPCService; import io.quarkiverse.cxf.transport.CxfHandler; import io.quarkiverse.cxf.transport.VertxDestinationFactory; import io.quarkus.arc.runtime.BeanContainer; @@ -135,8 +134,6 @@ public Handler initServer( HttpConfiguration httpConfiguration, CxfFixedConfig fixedConfig) { LOGGER.trace("init server"); - // There may be a better way to handle this - CxfJsonRPCService.setServletInfos(infos.getValue()); return new CxfHandler(infos.getValue(), beanContainer, httpConfiguration, fixedConfig); } diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/ClientInjectionPoint.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/ClientInjectionPoint.java new file mode 100644 index 000000000..79f60833b --- /dev/null +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/ClientInjectionPoint.java @@ -0,0 +1,39 @@ +package io.quarkiverse.cxf; + +import java.util.Objects; + +public class ClientInjectionPoint { + private final String configKey; + private final Class sei; + + public ClientInjectionPoint(String configKey, Class sei) { + super(); + this.configKey = configKey; + this.sei = sei; + } + + public String getConfigKey() { + return configKey; + } + + public Class getSei() { + return sei; + } + + @Override + public int hashCode() { + return Objects.hash(configKey, sei); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ClientInjectionPoint other = (ClientInjectionPoint) obj; + return Objects.equals(configKey, other.configKey) && Objects.equals(sei, other.sei); + } +} \ No newline at end of file diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfClientProducer.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfClientProducer.java index c323b0ebc..3522be4bd 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfClientProducer.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/CxfClientProducer.java @@ -13,6 +13,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Supplier; import java.util.stream.Collectors; import javax.net.ssl.HostnameVerifier; @@ -146,6 +147,7 @@ private Object produceCxfClient(CXFClientInfo cxfClientInfo) { QuarkusJaxWsProxyFactoryBean factory = new QuarkusJaxWsProxyFactoryBean(quarkusClientFactoryBean, interfaces); final Map props = new LinkedHashMap<>(); factory.setProperties(props); + props.put(CXFClientInfo.class.getName(), cxfClientInfo); factory.setServiceClass(seiClass); LOGGER.debugf("using servicename {%s}%s", cxfClientInfo.getWsNamespace(), cxfClientInfo.getWsName()); factory.setServiceName(new QName(cxfClientInfo.getWsNamespace(), cxfClientInfo.getWsName())); @@ -376,21 +378,41 @@ protected static CXFClientInfo selectorCXFClientInfo( // If injection point is annotated with @CXFClient then determine a // configuration by looking up annotated config value: + final String configKey; if (ip.getAnnotated().isAnnotationPresent(CXFClient.class)) { - CXFClient anno = ip.getAnnotated().getAnnotation(CXFClient.class); - String configKey = anno.value(); + final CXFClient anno = ip.getAnnotated().getAnnotation(CXFClient.class); + configKey = anno.value(); + } else { + configKey = ""; + } + return selectorCXFClientInfo( + config, + fixedConfig, + meta, + configKey, + () -> new IllegalStateException( + "quarkus.cxf.client.\"" + configKey + "\" is referenced in " + ip.getMember() + + " but no such build time configuration entry exists")); + } + + public static CXFClientInfo selectorCXFClientInfo( + CxfConfig config, + CxfFixedConfig fixedConfig, + CXFClientData meta, + String configKey, + Supplier exceptionSupplier) { + // If injection point is annotated with @CXFClient then determine a + // configuration by looking up annotated config value: + + if (configKey != null && !configKey.isEmpty()) { if (config.isClientPresent(configKey)) { return new CXFClientInfo(meta, config.getClient(configKey), configKey); } - // If config-key is present and not default: This is an error: - if (configKey != null && !configKey.isEmpty()) { - throw new IllegalStateException( - "quarkus.cxf.\"" + configKey + "\" is referenced in " + ip.getMember() - + " but no such build time configuration entry exists"); - } + throw exceptionSupplier.get(); } + // User did not specify any client config value. Thus we make a smart guess // about which configuration is to be used. // diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/CxfJsonRPCService.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/CxfJsonRPCService.java index 9d5c23b06..7ec1cd137 100644 --- a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/CxfJsonRPCService.java +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/CxfJsonRPCService.java @@ -1,41 +1,83 @@ package io.quarkiverse.cxf.devui; import java.util.ArrayList; -import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; -import jakarta.enterprise.inject.spi.CDI; +import jakarta.inject.Inject; +import jakarta.inject.Named; +import io.quarkiverse.cxf.CXFClientData; import io.quarkiverse.cxf.CXFClientInfo; -import io.quarkiverse.cxf.CXFServletInfo; import io.quarkiverse.cxf.CXFServletInfos; +import io.quarkiverse.cxf.ClientInjectionPoint; +import io.quarkiverse.cxf.CxfClientProducer; +import io.quarkiverse.cxf.CxfConfig; +import io.quarkiverse.cxf.CxfFixedConfig; +import io.quarkus.arc.Arc; public class CxfJsonRPCService { - private static CXFServletInfos cxfServletInfos; + private static List servletInfos = Collections.emptyList(); - public List getServices() { - List servletInfos = cxfServletInfos != null ? new ArrayList<>(cxfServletInfos.getInfos()) - : new ArrayList<>(); - servletInfos.sort(Comparator.comparing(CXFServletInfo::getSei)); + @Inject + @Named("clientInjectionPoints") + List clientInjectionPoints; + + @Inject + CxfConfig config; + + @Inject + CxfFixedConfig fixedConfig; + + public List getServices() { return servletInfos; } - public List getClients() { - List clientInfos = new ArrayList<>(allClientInfos()); - clientInfos.sort(Comparator.comparing(CXFClientInfo::getSei)); - return clientInfos; + public int getServiceCount() { + return servletInfos.size(); } - private static Collection allClientInfos() { - return CDI.current().select(CXFClientInfo.class).stream().collect(Collectors.toCollection(ArrayList::new)); + public int getClientCount() { + return clientInjectionPoints.size(); } - public static void setServletInfos(CXFServletInfos infos) { - if (cxfServletInfos == null) { - cxfServletInfos = infos; + public List getClients() { + List result = new ArrayList<>(clientInjectionPoints.size()); + for (ClientInjectionPoint ip : clientInjectionPoints) { + final CXFClientData cxfClientData = (CXFClientData) Arc.container().instance(ip.getSei().getName()).get(); + + final CXFClientInfo clientInfo = CxfClientProducer.selectorCXFClientInfo( + config, + fixedConfig, + cxfClientData, + ip.getConfigKey(), + () -> new IllegalStateException("Cannot find quarkus.cxf.client.\"" + ip.getConfigKey() + "\"")); + + final DevUiClientInfo devUiIInfo = new DevUiClientInfo( + ip.getConfigKey(), + ip.getSei().getName(), + clientInfo.getEndpointAddress(), + clientInfo.getWsdlUrl()); + + result.add(devUiIInfo); } + result.sort(Comparator.comparing(DevUiClientInfo::getSei)); + return result; } + + public static void setServletInfos(CXFServletInfos infos) { + servletInfos = Collections.unmodifiableList( + infos.getInfos().stream() + .map(DevUiServiceInfo::of) + .sorted(Comparator.comparing(DevUiServiceInfo::getImplementor)) + .collect(Collectors.toList())); + } + + public static void shutdown() { + servletInfos = Collections.emptyList(); + } + } diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/DevUiClientInfo.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/DevUiClientInfo.java new file mode 100644 index 000000000..fcaaaa698 --- /dev/null +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/DevUiClientInfo.java @@ -0,0 +1,39 @@ +package io.quarkiverse.cxf.devui; + +public class DevUiClientInfo { + + private final String configKey; + private final String sei; + private final String address; + private final String wsdl; + + public DevUiClientInfo(String configKey, String sei, String address, String wsdl) { + super(); + this.configKey = configKey; + this.sei = sei; + this.address = address; + this.wsdl = wsdl; + } + + public String getConfigKey() { + return configKey; + } + + public String getSei() { + return sei; + } + + public String getAddress() { + return address; + } + + public String getWsdl() { + return wsdl; + } + + @Override + public String toString() { + return "DevUiClientInfo [configKey=" + configKey + ", sei=" + sei + ", address=" + address + ", wsdl=" + wsdl + "]"; + } + +} \ No newline at end of file diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/DevUiRecorder.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/DevUiRecorder.java new file mode 100644 index 000000000..a2a23e2fc --- /dev/null +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/DevUiRecorder.java @@ -0,0 +1,24 @@ +package io.quarkiverse.cxf.devui; + +import java.util.List; + +import io.quarkiverse.cxf.CXFServletInfos; +import io.quarkiverse.cxf.ClientInjectionPoint; +import io.quarkus.runtime.RuntimeValue; +import io.quarkus.runtime.ShutdownContext; +import io.quarkus.runtime.annotations.Recorder; + +@Recorder +public class DevUiRecorder { + public RuntimeValue> clientInjectionPoints(List injectionPoints) { + return new RuntimeValue<>(injectionPoints); + } + + public void servletInfos(RuntimeValue cxfServletInfos) { + CxfJsonRPCService.setServletInfos(cxfServletInfos.getValue()); + } + + public void shutdown(ShutdownContext context) { + context.addShutdownTask(CxfJsonRPCService::shutdown); + } +} diff --git a/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/DevUiServiceInfo.java b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/DevUiServiceInfo.java new file mode 100644 index 000000000..60abff718 --- /dev/null +++ b/extensions/core/runtime/src/main/java/io/quarkiverse/cxf/devui/DevUiServiceInfo.java @@ -0,0 +1,33 @@ +package io.quarkiverse.cxf.devui; + +import io.quarkiverse.cxf.CXFServletInfo; + +public class DevUiServiceInfo { + + public static DevUiServiceInfo of(CXFServletInfo info) { + return new DevUiServiceInfo(info.getPath() + info.getRelativePath(), info.getClassName()); + } + + private final String path; + private final String implementor; + + public DevUiServiceInfo(String path, String implementor) { + super(); + this.path = path; + this.implementor = implementor; + } + + public String getPath() { + return path; + } + + public String getImplementor() { + return implementor; + } + + @Override + public String toString() { + return "DevUiServiceInfo [path=" + path + ", implementor=" + implementor + "]"; + } + +} \ No newline at end of file diff --git a/extensions/src/main/java/io/quarkiverse/cxf/transport/generated/AppendBuffer.java b/extensions/src/main/java/io/quarkiverse/cxf/transport/generated/AppendBuffer.java new file mode 100644 index 000000000..3b8414d0f --- /dev/null +++ b/extensions/src/main/java/io/quarkiverse/cxf/transport/generated/AppendBuffer.java @@ -0,0 +1,202 @@ +package io.quarkiverse.cxf.transport.generated; + +import java.util.ArrayDeque; +import java.util.Objects; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; + +/** + * Adapted by sync-quarkus-classes.groovy from + * ResteasyReactiveOutputStream + * from Quarkus. + * + *

+ * + * It is a bounded (direct) buffer container that can keep on accepting data till {@link #capacity} is exhausted.
+ * In order to keep appending on it, it can {@link #clear} and consolidate its content as a {@link ByteBuf}. + */ +final class AppendBuffer { + + private final ByteBufAllocator allocator; + + private final int minChunkSize; + + private final int capacity; + + private ByteBuf buffer; + + private ArrayDeque otherBuffers; + + private int size; + + private AppendBuffer(ByteBufAllocator allocator, int minChunkSize, int capacity) { + this.allocator = allocator; + this.minChunkSize = Math.min(minChunkSize, capacity); + this.capacity = capacity; + } + + /** + * This buffer append data in a single eagerly allocated {@link ByteBuf}. + */ + public static AppendBuffer eager(ByteBufAllocator allocator, int capacity) { + return new AppendBuffer(allocator, capacity, capacity); + } + + /** + * This buffer append data in multiples {@link ByteBuf}s sized as each {@code len} in {@link #append}.
+ * The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}. + */ + public static AppendBuffer exact(ByteBufAllocator allocator, int capacity) { + return new AppendBuffer(allocator, 0, capacity); + } + + /** + * This buffer append data in multiples {@link ByteBuf}s which minimum capacity is {@code minChunkSize} or + * as each {@code len}, if greater than it.
+ * The data is consolidated in a single {@link CompositeByteBuf} on {@link #clear}. + */ + public static AppendBuffer withMinChunks(ByteBufAllocator allocator, int minChunkSize, int capacity) { + return new AppendBuffer(allocator, minChunkSize, capacity); + } + + private ByteBuf lastBuffer() { + if (otherBuffers == null || otherBuffers.isEmpty()) { + return buffer; + } + return otherBuffers.peekLast(); + } + + /** + * It returns how many bytes have been appended
+ * If returns a value different from {@code len}, is it required to invoke {@link #clear} + * that would refill the available capacity till {@link #capacity()} + */ + public int append(byte[] bytes, int off, int len) { + Objects.requireNonNull(bytes); + if (len == 0) { + return 0; + } + int alreadyWritten = 0; + if (minChunkSize > 0) { + var lastBuffer = lastBuffer(); + if (lastBuffer != null) { + int availableOnLast = lastBuffer.writableBytes(); + if (availableOnLast > 0) { + int toWrite = Math.min(len, availableOnLast); + lastBuffer.writeBytes(bytes, off, toWrite); + size += toWrite; + len -= toWrite; + // we stop if there's no more to append + if (len == 0) { + return toWrite; + } + off += toWrite; + alreadyWritten = toWrite; + } + } + } + final int availableCapacity = capacity - size; + if (availableCapacity == 0) { + return alreadyWritten; + } + // we can still write some + int toWrite = Math.min(len, availableCapacity); + assert toWrite > 0; + final int chunkCapacity; + if (minChunkSize > 0) { + // Cannot allocate less than minChunkSize, till the limit of capacity left + chunkCapacity = Math.min(Math.max(minChunkSize, toWrite), availableCapacity); + } else { + chunkCapacity = toWrite; + } + var tmpBuf = allocator.directBuffer(chunkCapacity); + try { + tmpBuf.writeBytes(bytes, off, toWrite); + } catch (Throwable t) { + tmpBuf.release(); + throw t; + } + if (buffer == null) { + buffer = tmpBuf; + } else { + boolean resetOthers = false; + try { + if (otherBuffers == null) { + otherBuffers = new ArrayDeque<>(); + resetOthers = true; + } + otherBuffers.add(tmpBuf); + } catch (Throwable t) { + rollback(alreadyWritten, tmpBuf, resetOthers); + throw t; + } + } + size += toWrite; + return toWrite + alreadyWritten; + } + + private void rollback(int alreadyWritten, ByteBuf tmpBuf, boolean resetOthers) { + tmpBuf.release(); + if (resetOthers) { + otherBuffers = null; + } + if (alreadyWritten > 0) { + var last = lastBuffer(); + last.writerIndex(last.writerIndex() - alreadyWritten); + size -= alreadyWritten; + assert last.writerIndex() > 0; + } + } + + public ByteBuf clear() { + var firstBuf = buffer; + if (firstBuf == null) { + return null; + } + var others = otherBuffers; + if (others == null || others.isEmpty()) { + size = 0; + buffer = null; + // super fast-path + return firstBuf; + } + return clearBuffers(); + } + + private CompositeByteBuf clearBuffers() { + var firstBuf = buffer; + var others = otherBuffers; + var batch = allocator.compositeDirectBuffer(1 + others.size()); + try { + buffer = null; + size = 0; + batch.addComponent(true, 0, firstBuf); + for (int i = 0, othersCount = others.size(); i < othersCount; i++) { + // if addComponent fail, it takes care of releasing curr and throwing the exception: + batch.addComponent(true, 1 + i, others.poll()); + } + return batch; + } catch (Throwable anyError) { + batch.release(); + releaseOthers(others); + throw anyError; + } + } + + private static void releaseOthers(ArrayDeque others) { + ByteBuf buf; + while ((buf = others.poll()) != null) { + buf.release(); + } + } + + public int capacity() { + return capacity; + } + + public int availableCapacity() { + return capacity - size; + } +} diff --git a/extensions/src/main/java/io/quarkiverse/cxf/transport/generated/VertxServletOutputStream.java b/extensions/src/main/java/io/quarkiverse/cxf/transport/generated/VertxServletOutputStream.java new file mode 100644 index 000000000..1a92283e7 --- /dev/null +++ b/extensions/src/main/java/io/quarkiverse/cxf/transport/generated/VertxServletOutputStream.java @@ -0,0 +1,294 @@ +package io.quarkiverse.cxf.transport.generated; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import jakarta.servlet.ServletOutputStream; +import jakarta.servlet.WriteListener; +import org.jboss.logging.Logger; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.quarkiverse.cxf.transport.VertxReactiveRequestContext; +import io.quarkus.vertx.core.runtime.VertxBufferImpl; +import io.vertx.core.AsyncResult; +import io.vertx.core.Context; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpServerResponse; + +/** + * Adapted by sync-quarkus-classes.groovy from + * ResteasyReactiveOutputStream + * from Quarkus. + */ +public class VertxServletOutputStream extends ServletOutputStream { + + private static final Logger log = Logger.getLogger("org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveOutputStream"); + + private final VertxReactiveRequestContext context; + + protected final HttpServerRequest request; + + private final AppendBuffer appendBuffer; + + private boolean committed; + + private boolean closed; + + protected boolean waitingForDrain; + + protected boolean drainHandlerRegistered; + + protected boolean first = true; + + protected Throwable throwable; + + private ByteArrayOutputStream overflow; + + public VertxServletOutputStream(VertxReactiveRequestContext context) { + this.context = context; + this.request = context.getContext().request(); + this.appendBuffer = AppendBuffer.withMinChunks(PooledByteBufAllocator.DEFAULT, context.getDeployment().getResteasyReactiveConfig().getMinChunkSize(), context.getDeployment().getResteasyReactiveConfig().getOutputBufferSize()); + request.response().exceptionHandler(new Handler() { + + @Override + public void handle(Throwable event) { + throwable = event; + log.debugf(event, "IO Exception "); + //TODO: do we need this? + terminateResponse(); + request.connection().close(); + synchronized (request.connection()) { + if (waitingForDrain) { + request.connection().notifyAll(); + } + } + } + }); + context.getContext().addEndHandler(new Handler>() { + + @Override + public void handle(AsyncResult event) { + synchronized (request.connection()) { + if (waitingForDrain) { + request.connection().notifyAll(); + } + } + terminateResponse(); + } + }); + } + + public void terminateResponse() { + } + + Buffer createBuffer(ByteBuf data) { + return new VertxBufferImpl(data); + } + + public void write(ByteBuf data, boolean last) throws IOException { + if (last && data == null) { + request.response().end((Handler>) null); + return; + } + //do all this in the same lock + synchronized (request.connection()) { + try { + boolean bufferRequired = awaitWriteable() || (overflow != null && overflow.size() > 0); + if (bufferRequired) { + //just buffer everything + registerDrainHandler(); + if (overflow == null) { + overflow = new ByteArrayOutputStream(); + } + if (data.hasArray()) { + overflow.write(data.array(), data.arrayOffset() + data.readerIndex(), data.readableBytes()); + } else { + data.getBytes(data.readerIndex(), overflow, data.readableBytes()); + } + if (last) { + closed = true; + } + data.release(); + } else { + if (last) { + request.response().end(createBuffer(data), null); + } else { + request.response().write(createBuffer(data), null); + } + } + } catch (Exception e) { + if (data != null && data.refCnt() > 0) { + data.release(); + } + throw new IOException("Failed to write", e); + } + } + } + + private boolean awaitWriteable() throws IOException { + if (Context.isOnEventLoopThread()) { + return request.response().writeQueueFull(); + } + if (first) { + first = false; + return false; + } + assert Thread.holdsLock(request.connection()); + while (request.response().writeQueueFull()) { + if (throwable != null) { + throw new IOException(throwable); + } + if (request.response().closed()) { + throw new IOException("Connection has been closed"); + } + registerDrainHandler(); + try { + waitingForDrain = true; + request.connection().wait(); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } finally { + waitingForDrain = false; + } + } + return false; + } + + private void registerDrainHandler() { + if (!drainHandlerRegistered) { + drainHandlerRegistered = true; + Handler handler = new Handler() { + + @Override + public void handle(Void event) { + synchronized (request.connection()) { + if (waitingForDrain) { + request.connection().notifyAll(); + } + if (overflow != null) { + if (overflow.size() > 0) { + if (closed) { + request.response().end(Buffer.buffer(overflow.toByteArray()), null); + } else { + request.response().write(Buffer.buffer(overflow.toByteArray()), null); + } + overflow.reset(); + } + } + } + } + }; + request.response().drainHandler(handler); + request.response().closeHandler(handler); + } + } + + /** + * {@inheritDoc} + */ + public void write(final int b) throws IOException { + write(new byte[] { (byte) b }, 0, 1); + } + + /** + * {@inheritDoc} + */ + public void write(final byte[] b) throws IOException { + write(b, 0, b.length); + } + + /** + * {@inheritDoc} + */ + public void write(final byte[] b, final int off, final int len) throws IOException { + if (len < 1) { + return; + } + if (closed) { + throw new IOException("Stream is closed"); + } + int rem = len; + int idx = off; + try { + while (rem > 0) { + final int written = appendBuffer.append(b, idx, rem); + if (written < rem) { + writeBlocking(appendBuffer.clear(), false); + } + rem -= written; + idx += written; + } + } catch (Exception e) { + throw new IOException(e); + } + } + + public void writeBlocking(ByteBuf buffer, boolean finished) throws IOException { + prepareWrite(buffer, finished); + write(buffer, finished); + } + + private void prepareWrite(ByteBuf buffer, boolean finished) throws IOException { + if (!committed) { + committed = true; + if (finished) { + final HttpServerResponse response = request.response(); + if (!response.headWritten()) { + if (buffer == null) { + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, "0"); + } else { + response.headers().set(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(buffer.readableBytes())); + } + } + } else { + request.response().setChunked(true); + } + } + } + + /** + * {@inheritDoc} + */ + public void flush() throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } + try { + var toFlush = appendBuffer.clear(); + if (toFlush != null) { + writeBlocking(toFlush, false); + } + } catch (Exception e) { + throw new IOException(e); + } + } + + /** + * {@inheritDoc} + */ + public void close() throws IOException { + if (closed) + return; + try { + writeBlocking(appendBuffer.clear(), true); + } catch (Exception e) { + throw new IOException(e); + } finally { + closed = true; + } + } + + @Override + public boolean isReady() { + throw new UnsupportedOperationException(); + } + + @Override + public void setWriteListener(WriteListener writeListener) { + throw new UnsupportedOperationException(); + } +}