Skip to content

Latest commit

 

History

History
217 lines (175 loc) · 8.17 KB

README.md

File metadata and controls

217 lines (175 loc) · 8.17 KB

Spring - Stream HTTP Response from JPA

Alexander Partsch [email protected] | 28th of August, 2020 - 06:00 in the Morning | @wtfjohngalt on Twitter

While working on a web service sending 10.000+ records to a client for further processing, we realized the connection and handling logic of classical pagination would impose an untenable performance penalty on the user. We knew about JSON Streaming as an alternative, but never used it up until know. In the following this text explains how to implement a streaming HTTP endpoint handler with the Spring Framework. We'll even stream records fetched via JPA and see how the connection handling is ensured.

StreamingResponseBody

The Spring MVC class StreamingResponseBody is a functional interface for asynchronous request processing. Even simpler: It's a Consumer of an OutputStream one returns in a Spring MVC request handler. The OutputStream points to the Servlets request body.

@RestController
class StreamingResponseBodyExample {
    @GetMapping(path = "/")
    StreamingResponseBody stream() {
        return out -> {
            try (var writer = new PrintWriter(out)) {
                writer.println("Hello, World!");
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        };
    }
}

Streaming JSON

When using StreamingResponseBody one cannot expect Spring to serialize the response body to the desired representation format anymore, since the handler writes directly to the Servlets response OutputStream. Therefore I would employ JsonFactory with any ObjectMapper of choice as codec to create a JsonGenerator, which can be used to serialize on the fly. In the following example a list of POJOs is converted to a JSON array in the response body:

@RestController
class StreamingJsonExample {

    private static final JsonFactory JSON_FACTORY =
        new JsonFactory().setCodec(new ObjectMapper());

    @Autowired
    private DataService service;

    @GetMapping(path = "/", 
        produces = {MediaType.APPLICATION_STREAM_JSON_VALUE}) // 1
    StreamingResponseBody stream() {
        return out -> {
            var data = service.loadData();
            try (var json = JSON_FACTORY.createGenerator(out)) {
                json.writeStartArray();
                for(var item : data) {
                    json.writeObject(item);
                 }
                json.writeEndArray();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        };
    }
}

//1 We need to specify the response body ourselves.

Connecting to JPA

Springs JPA implementation supports streaming query results, if the datasource allows so. This means the repository methods return a java.util.Stream instead of a collection. Records are fetched as demanded. Since the StreamingResponseBody is executed after our MVC request handler returned, we need to make a sure the transaction is kept alive until the stream completed.

Therefore Spring defined the OpenEntityManagerInViewFilter providing the database connection / transaction to any newly spawned thread and properly cleaning up afterwards. We just have to make sure the response body is executed under a transaction. We can easily achive this with a service layer class that applies a @Transactional annotation (make sure neither class nor method are final or non-public):

@Service
public class StreamingService {

    private static final JsonFactory JSON_FACTOR = new JsonFactory()
            .setCodec(new ObjectMapper());

    @Autowired
    private FakeDataRepository repository;

    @Transactional(readOnly = true)
    public void stream(OutputStream outputStream) throws IOException {
        try(var json = JSON_FACTOR.createGenerator(outputStream)) {
            json.writeStartArray();
            repository.findByIdNotNull()
                    .forEach(pojo -> {
                        try {
                            json.writeObject(pojo);
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    });
            json.writeEndArray();
        }
    }
}

The controller is simply referencing it:

@RestController
@SpringBootApplication
public class SpringRestStreamFromDatabaseApplication {

	@Autowired
	private StreamingService streamingService;

	public static void main(String[] args) {
		SpringApplication.run(SpringRestStreamFromDatabaseApplication.class, args);
	}

	@GetMapping(path = "/stream", produces = {MediaType.APPLICATION_STREAM_JSON_VALUE})
	public StreamingResponseBody stream() {
		return streamingService::stream;
	}

}

The JPA repository is even simpler:

@Repository
public interface FakeDataRepository extends JpaRepository<FakeDataEntity, UUID> {

    Stream<FakeDataEntity> findByIdNotNull();

}

Testing Streaming Endpoints

Testing streaming endpoints with MockMvc differs since you need to keep the connection/request handler alive until the whole response was red. Spring therefore offers the RequestBuilder asyncDispatch to wait for the connection to close:

@SpringBootTest
@ActiveProfiles({"test"})
class SpringRestStreamFromDatabaseApplicationTests {

    private MockMvc api;

    @Autowired
    @BeforeEach
    void setup(WebApplicationContext wac) {
        api = MockMvcBuilders.webAppContextSetup(wac)
                .build();
    }

    @Test
    @DisplayName("Stream Response from Database")
    void shouldStreamResponseFromDatabase() throws Exception {
        // Act
        var async = api.perform(get("/stream"))
                .andExpect(request().asyncStarted())
                .andDo(MvcResult::getAsyncResult)
                .andReturn();
        api.perform(asyncDispatch(async))
                .andDo(print())
                .andExpect(status().isOk());

    }

}

Connection Pool Leak < Spring Boot 2.3

As you can read in this issue, there was a connection leakage bug in OpenEntityManagerInViewFilter not properly dispatching the threads and JPA transactins. This can be fixed by registering the filter anew with the ASYNC DispatchType:

 @Test 
 void startupWithOncePerRequestDefaults() throws Exception { 
 	FilterRegistrationBean<?> bean = new FilterRegistrationBean<>(this.oncePerRequestFilter); 
 	bean.onStartup(this.servletContext); 
 	verify(this.servletContext).addFilter(eq("oncePerRequestFilter"), eq(this.oncePerRequestFilter)); 
 	verify(this.registration).setAsyncSupported(true); 
 	verify(this.registration).addMappingForUrlPatterns(EnumSet.allOf(DispatcherType.class), false, "/*"); 
 } 

Conclusion

Support for JSON Streaming in Spring comes nearly out of the box. Using the JsonGenerator and knowing about the OpenEntityManagerInView pattern is all you need to move on. A complexity decrease in your client code as well as performance improvements should appply if done correctly.