Alexander Partsch [email protected] | 28th of August, 2020 - 06:00 in the Morning | @wtfjohngalt on Twitter
While working on a web service sending 10.000+ records to a client for further processing, we realized the connection and handling logic of classical pagination would impose an untenable performance penalty on the user. We knew about JSON Streaming as an alternative, but never used it up until know. In the following this text explains how to implement a streaming HTTP endpoint handler with the Spring Framework. We'll even stream records fetched via JPA and see how the connection handling is ensured.
The Spring MVC class StreamingResponseBody
is a functional interface for asynchronous request processing. Even simpler:
It's a Consumer
of an OutputStream
one returns in a Spring MVC request
handler. The OutputStream
points to the Servlets request body.
@RestController
class StreamingResponseBodyExample {
@GetMapping(path = "/")
StreamingResponseBody stream() {
return out -> {
try (var writer = new PrintWriter(out)) {
writer.println("Hello, World!");
} catch (IOException e) {
throw new RuntimeException(e);
}
};
}
}
When using StreamingResponseBody
one cannot expect Spring to serialize the
response body to the desired representation format anymore, since the handler
writes directly to the Servlets response OutputStream
. Therefore I would
employ JsonFactory
with any ObjectMapper
of choice as codec to create a JsonGenerator,
which can be used to serialize on the fly. In the following example a list of
POJOs is converted to a JSON array in the response body:
@RestController
class StreamingJsonExample {
private static final JsonFactory JSON_FACTORY =
new JsonFactory().setCodec(new ObjectMapper());
@Autowired
private DataService service;
@GetMapping(path = "/",
produces = {MediaType.APPLICATION_STREAM_JSON_VALUE}) // 1
StreamingResponseBody stream() {
return out -> {
var data = service.loadData();
try (var json = JSON_FACTORY.createGenerator(out)) {
json.writeStartArray();
for(var item : data) {
json.writeObject(item);
}
json.writeEndArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
};
}
}
//1 We need to specify the response body ourselves.
Springs JPA implementation supports streaming query results,
if the datasource allows so. This means the repository methods return a
java.util.Stream
instead of a collection. Records are fetched as demanded.
Since the StreamingResponseBody
is executed after our MVC request handler
returned, we need to make a sure the transaction is kept alive until the
stream completed.
Therefore Spring defined the OpenEntityManagerInViewFilter
providing the database connection / transaction to any newly spawned thread and
properly cleaning up afterwards. We just have to make sure the response body is
executed under a transaction. We can easily achive this with a service layer
class that applies a @Transactional
annotation (make sure neither class
nor method are final or non-public):
@Service
public class StreamingService {
private static final JsonFactory JSON_FACTOR = new JsonFactory()
.setCodec(new ObjectMapper());
@Autowired
private FakeDataRepository repository;
@Transactional(readOnly = true)
public void stream(OutputStream outputStream) throws IOException {
try(var json = JSON_FACTOR.createGenerator(outputStream)) {
json.writeStartArray();
repository.findByIdNotNull()
.forEach(pojo -> {
try {
json.writeObject(pojo);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
json.writeEndArray();
}
}
}
The controller is simply referencing it:
@RestController
@SpringBootApplication
public class SpringRestStreamFromDatabaseApplication {
@Autowired
private StreamingService streamingService;
public static void main(String[] args) {
SpringApplication.run(SpringRestStreamFromDatabaseApplication.class, args);
}
@GetMapping(path = "/stream", produces = {MediaType.APPLICATION_STREAM_JSON_VALUE})
public StreamingResponseBody stream() {
return streamingService::stream;
}
}
The JPA repository is even simpler:
@Repository
public interface FakeDataRepository extends JpaRepository<FakeDataEntity, UUID> {
Stream<FakeDataEntity> findByIdNotNull();
}
Testing streaming endpoints with MockMvc
differs since you need to keep
the connection/request handler alive until the whole response was red. Spring
therefore offers the RequestBuilder
asyncDispatch
to wait for the connection to close:
@SpringBootTest
@ActiveProfiles({"test"})
class SpringRestStreamFromDatabaseApplicationTests {
private MockMvc api;
@Autowired
@BeforeEach
void setup(WebApplicationContext wac) {
api = MockMvcBuilders.webAppContextSetup(wac)
.build();
}
@Test
@DisplayName("Stream Response from Database")
void shouldStreamResponseFromDatabase() throws Exception {
// Act
var async = api.perform(get("/stream"))
.andExpect(request().asyncStarted())
.andDo(MvcResult::getAsyncResult)
.andReturn();
api.perform(asyncDispatch(async))
.andDo(print())
.andExpect(status().isOk());
}
}
As you can read in this issue,
there was a connection leakage bug in OpenEntityManagerInViewFilter
not properly dispatching the threads and JPA transactins. This can be fixed
by registering the filter anew with the ASYNC
DispatchType
:
@Test
void startupWithOncePerRequestDefaults() throws Exception {
FilterRegistrationBean<?> bean = new FilterRegistrationBean<>(this.oncePerRequestFilter);
bean.onStartup(this.servletContext);
verify(this.servletContext).addFilter(eq("oncePerRequestFilter"), eq(this.oncePerRequestFilter));
verify(this.registration).setAsyncSupported(true);
verify(this.registration).addMappingForUrlPatterns(EnumSet.allOf(DispatcherType.class), false, "/*");
}
Support for JSON Streaming in Spring comes nearly out of the box. Using the
JsonGenerator
and knowing about the OpenEntityManagerInView
pattern is
all you need to move on. A complexity decrease in your client code as well as
performance improvements should appply if done correctly.