-
Notifications
You must be signed in to change notification settings - Fork 926
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support micrometer context-propagation (#5577)
### Motivation: - Related Issue : #5145 - `Armeria` already support context-propagation to maintain `RequestContext` during executing Reactor code. How it requires maintenance. - `Reactor` integrate `micro-meter:context-propagation` to do context-propagation during `Flux`, `Mono` officially. thus, it would be better to migrate from `RequestContextHook` to `RequestContextPropagationHooks` because it can reduce maintenance cost. ### Modifications: - Add new `Hook` for `Reactor`. - Add new `ThreadLocalAccessor` for `micro-meter:context-propagation` to main `RequestContext` during executing Reactor code like `Mono`, `Flux`. - Add new config `enableContextPropagation` to integrate `micro-meter:context-propagation` with `spring-boot3`. ### Result: - Closes #5145 - If user want to use `micrometer:context-propagation` to maintain `RequestContext` during executing Reactor code like `Mono`, `Flux`, just call `RequestContextPropagationHook.enable()`. --------- Co-authored-by: minux <[email protected]> Co-authored-by: Trustin Lee <[email protected]> Co-authored-by: Trustin Lee <[email protected]> Co-authored-by: jrhee17 <[email protected]> Co-authored-by: Ikhun Um <[email protected]>
- Loading branch information
1 parent
460ea02
commit 0019bc7
Showing
8 changed files
with
1,340 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
dependencies { | ||
implementation libs.context.propagation | ||
testImplementation project(':reactor3') | ||
} |
106 changes: 106 additions & 0 deletions
106
...ava/com/linecorp/armeria/common/micrometer/context/RequestContextThreadLocalAccessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
/* | ||
* Copyright 2024 LINE Corporation | ||
* | ||
* LINE Corporation licenses this file to you under the Apache License, | ||
* version 2.0 (the "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at: | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package com.linecorp.armeria.common.micrometer.context; | ||
|
||
import org.reactivestreams.Subscription; | ||
|
||
import com.linecorp.armeria.common.RequestContext; | ||
import com.linecorp.armeria.common.RequestContextStorage; | ||
import com.linecorp.armeria.common.annotation.Nullable; | ||
import com.linecorp.armeria.common.annotation.UnstableApi; | ||
import com.linecorp.armeria.internal.common.RequestContextUtil; | ||
|
||
import io.micrometer.context.ContextRegistry; | ||
import io.micrometer.context.ContextSnapshot; | ||
import io.micrometer.context.ContextSnapshot.Scope; | ||
import io.micrometer.context.ThreadLocalAccessor; | ||
|
||
/** | ||
* This class works with the | ||
* <a href="https://docs.micrometer.io/context-propagation/reference/index.html">Micrometer | ||
* Context Propagation</a> to keep the {@link RequestContext} during | ||
* <a href="https://github.com/reactor/reactor-core">Reactor</a> operations. | ||
* Get the {@link RequestContextThreadLocalAccessor} to register it to the {@link ContextRegistry}. | ||
* Then, {@link ContextRegistry} will use {@link RequestContextThreadLocalAccessor} to | ||
* propagate context during the | ||
* <a href="https://github.com/reactor/reactor-core">Reactor</a> operations | ||
* so that you can get the context using {@link RequestContext#current()}. | ||
* However, please note that you should include Mono#contextWrite(ContextView) or | ||
* Flux#contextWrite(ContextView) to end of the Reactor codes. | ||
* If not, {@link RequestContext} will not be keep during Reactor Operation. | ||
*/ | ||
@UnstableApi | ||
public final class RequestContextThreadLocalAccessor implements ThreadLocalAccessor<RequestContext> { | ||
|
||
private static final Object KEY = RequestContext.class; | ||
|
||
/** | ||
* The value which obtained through {@link RequestContextThreadLocalAccessor}, | ||
* will be stored in the Context under this {@code KEY}. | ||
* This method will be called by {@link ContextSnapshot} internally. | ||
*/ | ||
@Override | ||
public Object key() { | ||
return KEY; | ||
} | ||
|
||
/** | ||
* {@link ContextSnapshot} will call this method during the execution | ||
* of lambda functions in {@link ContextSnapshot#wrap(Runnable)}, | ||
* as well as during Mono#subscribe(), Flux#subscribe(), | ||
* {@link Subscription#request(long)}, and CoreSubscriber#onSubscribe(Subscription). | ||
* Following these calls, {@link ContextSnapshot#setThreadLocals()} is | ||
* invoked to restore the state of {@link RequestContextStorage}. | ||
* Furthermore, at the end of these methods, {@link Scope#close()} is executed | ||
* to revert the {@link RequestContextStorage} to its original state. | ||
*/ | ||
@Nullable | ||
@Override | ||
public RequestContext getValue() { | ||
return RequestContext.currentOrNull(); | ||
} | ||
|
||
/** | ||
* {@link ContextSnapshot} will call this method during the execution | ||
* of lambda functions in {@link ContextSnapshot#wrap(Runnable)}, | ||
* as well as during Mono#subscribe(), Flux#subscribe(), | ||
* {@link Subscription#request(long)}, and CoreSubscriber#onSubscribe(Subscription). | ||
* Following these calls, {@link ContextSnapshot#setThreadLocals()} is | ||
* invoked to restore the state of {@link RequestContextStorage}. | ||
* Furthermore, at the end of these methods, {@link Scope#close()} is executed | ||
* to revert the {@link RequestContextStorage} to its original state. | ||
*/ | ||
@Override | ||
@SuppressWarnings("MustBeClosedChecker") | ||
public void setValue(RequestContext value) { | ||
RequestContextUtil.getAndSet(value); | ||
} | ||
|
||
/** | ||
* This method will be called at the start of {@link ContextSnapshot.Scope} and | ||
* the end of {@link ContextSnapshot.Scope}. If reactor Context does not | ||
* contains {@link RequestContextThreadLocalAccessor#KEY}, {@link ContextSnapshot} will use | ||
* this method to remove the value from {@link ThreadLocal}. | ||
* Please note that {@link RequestContextUtil#pop()} return {@link AutoCloseable} instance, | ||
* but it is not used in `Try with Resources` syntax. this is because {@link ContextSnapshot.Scope} | ||
* will handle the {@link AutoCloseable} instance returned by {@link RequestContextUtil#pop()}. | ||
*/ | ||
@Override | ||
@SuppressWarnings("MustBeClosedChecker") | ||
public void setValue() { | ||
RequestContextUtil.pop(); | ||
} | ||
} |
26 changes: 26 additions & 0 deletions
26
...er-context/src/main/java/com/linecorp/armeria/common/micrometer/context/package-info.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
/* | ||
* Copyright 2024 LINE Corporation | ||
* | ||
* LINE Corporation licenses this file to you under the Apache License, | ||
* version 2.0 (the "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at: | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
/** | ||
* Micrometer context-propagation plugins to help keep {@link com.linecorp.armeria.common.RequestContext} | ||
* during Reactor operations. | ||
*/ | ||
@UnstableApi | ||
@NonNullByDefault | ||
package com.linecorp.armeria.common.micrometer.context; | ||
|
||
import com.linecorp.armeria.common.annotation.NonNullByDefault; | ||
import com.linecorp.armeria.common.annotation.UnstableApi; |
158 changes: 158 additions & 0 deletions
158
...com/linecorp/armeria/common/micrometer/context/RequestContextThreadLocalAccessorTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
/* | ||
* Copyright 2024 LINE Corporation | ||
* | ||
* LINE Corporation licenses this file to you under the Apache License, | ||
* version 2.0 (the "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at: | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package com.linecorp.armeria.common.micrometer.context; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
import static org.assertj.core.api.Assertions.assertThatThrownBy; | ||
|
||
import org.junit.jupiter.api.Test; | ||
|
||
import com.linecorp.armeria.client.ClientRequestContext; | ||
import com.linecorp.armeria.common.HttpMethod; | ||
import com.linecorp.armeria.common.HttpRequest; | ||
import com.linecorp.armeria.common.RequestContext; | ||
import com.linecorp.armeria.internal.common.RequestContextUtil; | ||
|
||
import io.micrometer.context.ContextRegistry; | ||
import io.micrometer.context.ContextSnapshot; | ||
import io.micrometer.context.ContextSnapshot.Scope; | ||
import io.micrometer.context.ContextSnapshotFactory; | ||
|
||
class RequestContextThreadLocalAccessorTest { | ||
|
||
@Test | ||
void should_return_expected_key() { | ||
// Given | ||
final RequestContextThreadLocalAccessor reqCtxAccessor = new RequestContextThreadLocalAccessor(); | ||
final Object expectedValue = RequestContext.class; | ||
|
||
// When | ||
final Object result = reqCtxAccessor.key(); | ||
|
||
// Then | ||
assertThat(result).isEqualTo(expectedValue); | ||
} | ||
|
||
@Test | ||
@SuppressWarnings("MustBeClosedChecker") | ||
void should_success_set() { | ||
// Given | ||
final ClientRequestContext ctx = newContext(); | ||
final RequestContextThreadLocalAccessor reqCtxAccessor = new RequestContextThreadLocalAccessor(); | ||
|
||
// When | ||
reqCtxAccessor.setValue(ctx); | ||
|
||
// Then | ||
final RequestContext currentCtx = RequestContext.current(); | ||
assertThat(currentCtx).isEqualTo(ctx); | ||
|
||
RequestContextUtil.pop(); | ||
} | ||
|
||
@Test | ||
void should_throw_NPE_when_set_null() { | ||
// Given | ||
final RequestContextThreadLocalAccessor reqCtxAccessor = new RequestContextThreadLocalAccessor(); | ||
|
||
// When + Then | ||
assertThatThrownBy(() -> reqCtxAccessor.setValue(null)) | ||
.isInstanceOf(NullPointerException.class); | ||
} | ||
|
||
@Test | ||
void should_be_null_when_setValue() { | ||
// Given | ||
final ClientRequestContext ctx = newContext(); | ||
final RequestContextThreadLocalAccessor reqCtxAccessor = new RequestContextThreadLocalAccessor(); | ||
reqCtxAccessor.setValue(ctx); | ||
|
||
// When | ||
reqCtxAccessor.setValue(); | ||
|
||
// Then | ||
final RequestContext reqCtx = RequestContext.currentOrNull(); | ||
assertThat(reqCtx).isNull(); | ||
} | ||
|
||
@Test | ||
@SuppressWarnings("MustBeClosedChecker") | ||
void should_be_restore_original_state_when_restore() { | ||
// Given | ||
final RequestContextThreadLocalAccessor reqCtxAccessor = new RequestContextThreadLocalAccessor(); | ||
final ClientRequestContext previousCtx = newContext(); | ||
final ClientRequestContext currentCtx = newContext(); | ||
reqCtxAccessor.setValue(currentCtx); | ||
|
||
// When | ||
reqCtxAccessor.restore(previousCtx); | ||
|
||
// Then | ||
final RequestContext reqCtx = RequestContext.currentOrNull(); | ||
assertThat(reqCtx).isNotNull(); | ||
assertThat(reqCtx).isEqualTo(previousCtx); | ||
|
||
RequestContextUtil.pop(); | ||
} | ||
|
||
@Test | ||
void should_be_null_when_restore() { | ||
// Given | ||
final RequestContextThreadLocalAccessor reqCtxAccessor = new RequestContextThreadLocalAccessor(); | ||
final ClientRequestContext currentCtx = newContext(); | ||
reqCtxAccessor.setValue(currentCtx); | ||
|
||
// When | ||
reqCtxAccessor.restore(); | ||
|
||
// Then | ||
final RequestContext reqCtx = RequestContext.currentOrNull(); | ||
assertThat(reqCtx).isNull(); | ||
} | ||
|
||
@Test | ||
void requestContext_should_exist_inside_scope_and_not_outside() { | ||
// Given | ||
final RequestContextThreadLocalAccessor reqCtxAccessor = new RequestContextThreadLocalAccessor(); | ||
ContextRegistry.getInstance() | ||
.registerThreadLocalAccessor(reqCtxAccessor); | ||
final ClientRequestContext currentCtx = newContext(); | ||
final ClientRequestContext expectedCtx = currentCtx; | ||
reqCtxAccessor.setValue(currentCtx); | ||
|
||
final ContextSnapshotFactory factory = ContextSnapshotFactory.builder() | ||
.clearMissing(true) | ||
.build(); | ||
final ContextSnapshot contextSnapshot = factory.captureAll(); | ||
reqCtxAccessor.setValue(); | ||
|
||
// When : contextSnapshot.setThreadLocals() | ||
try (Scope ignored = contextSnapshot.setThreadLocals()) { | ||
|
||
// Then : should not | ||
final RequestContext reqCtxInScope = RequestContext.currentOrNull(); | ||
assertThat(reqCtxInScope).isSameAs(expectedCtx); | ||
} | ||
|
||
// Then | ||
final RequestContext reqCtxOutOfScope = RequestContext.currentOrNull(); | ||
assertThat(reqCtxOutOfScope).isNull(); | ||
} | ||
|
||
static ClientRequestContext newContext() { | ||
return ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); | ||
} | ||
} |
Oops, something went wrong.