[Webflux] Mono의 사용에 따라서 달라지는 리턴 값 & Binary Input Data의 처리
String으로 바로 반환 하는 것과 Mono<String> 의 형태로 바로 반환하는 것과 어떤 차이가 있을까요?
@RestController
public class TesterController {
@GetMapping("/")
Mono<String> hello() {
return Mono.just("Hello Webflux");
}
}
@RestController
public class TesterController {
@GetMapping("/")
String hello() {
return ("Hello Webflux");
}
}
모노로 리턴하면 Publisher의 형태로 만들어진 객체를 Spring이 자동으로 onSubscribe 처리를 하는데 Mono가 만든 Publisher를 구독하고 이를 onNext로 진행해서 내가 진행한 Stream의 데이터가 더 있는지 확인하고 다른 처리가 없다면 onComplete() or onError()를 내새울 것입니다.
아래와 같이 말이죠.
2024-01-30T22:13:54.575+09:00 INFO 16648 --- [nio-8080-exec-1] reactor.Mono.Just.1 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2024-01-30T22:13:54.576+09:00 INFO 16648 --- [nio-8080-exec-1] reactor.Mono.Just.1 : | request(unbounded)
2024-01-30T22:13:54.576+09:00 INFO 16648 --- [nio-8080-exec-1] reactor.Mono.Just.1 : | onNext(Hello Webflux)
2024-01-30T22:13:54.576+09:00 INFO 16648 --- [nio-8080-exec-1] reactor.Mono.Just.1 : | onComplete()
이 흐름이 중요할까요? 같이 보시죠
@RestController
public class TesterController {
@GetMapping("/")
Mono<String> hello() {
log.info("pos1");
Mono m = Mono.just("Hello Webflux").log();
log.info("pos2");
return m;
}
}
그런데 실행하면 이렇게 로그가 나옵니다.
2024-01-30T22:20:07.557+09:00 INFO 21124 --- [nio-8080-exec-1] c.t.tester.controller.TesterController : pos1
2024-01-30T22:20:07.558+09:00 INFO 21124 --- [nio-8080-exec-1] c.t.tester.controller.TesterController : pos2
2024-01-30T22:20:07.566+09:00 INFO 21124 --- [nio-8080-exec-1] reactor.Mono.Just.1 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2024-01-30T22:20:07.567+09:00 INFO 21124 --- [nio-8080-exec-1] reactor.Mono.Just.1 : | request(unbounded)
2024-01-30T22:20:07.567+09:00 INFO 21124 --- [nio-8080-exec-1] reactor.Mono.Just.1 : | onNext(Hello Webflux)
2024-01-30T22:20:07.567+09:00 INFO 21124 --- [nio-8080-exec-1] reactor.Mono.Just.1 : | onComplete()
왜 이런 결과가 나왔을까요? 저는 m 객체에 먼저 Mono.just().log()해서 출력하는 것을 먼저 실행 시켰는데 말이죠.
제가 왜 Mono m 라인이 실행 되는 동안에 실행되지 않을까요? 지금 보면 Controller의 다른 메서드가 먼저 실행이 되고 Mono의 코드가 실행이 되네요. 코드 자체는 동기적으로 진행되는 코드인데도 왜 이런식으로 실행이 될까요?
스프링에서 Mono를 구독해야만 실행이 되니까 그런 것인데요. 이를 이렇게 보면 마치 비동기 처럼 돌아가는 것 처럼 보입니다.
하지만, 그런 것은 아니고 Publisher가 퍼블리쉬를 해도 Subscriber가 구독을 해야만 실행이 되는 것이죠.
@GetMapping("/")
Mono<String> hello() {
log.info("pos1");
Mono m = Mono
.fromSupplier(() -> generateHello()) //Parameter는 없고 Return 값만 존재하는 객체
.doOnNext(c -> log.info(c)).log();
log.info("pos2");
return m;
}
이건 어떻게 실행이 될까요?
2024-01-30T22:29:33.258+09:00 INFO 3100 --- [nio-8080-exec-2] c.t.tester.controller.TesterController : pos1
2024-01-30T22:29:33.258+09:00 INFO 3100 --- [nio-8080-exec-2] c.t.tester.controller.TesterController : method generateHello()
2024-01-30T22:29:33.260+09:00 INFO 3100 --- [nio-8080-exec-2] c.t.tester.controller.TesterController : pos2
2024-01-30T22:29:33.266+09:00 INFO 3100 --- [nio-8080-exec-2] reactor.Mono.PeekFuseable.1 : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2024-01-30T22:29:33.267+09:00 INFO 3100 --- [nio-8080-exec-2] reactor.Mono.PeekFuseable.1 : | request(unbounded)
2024-01-30T22:29:33.267+09:00 INFO 3100 --- [nio-8080-exec-2] c.t.tester.controller.TesterController : Hello Message
2024-01-30T22:29:33.267+09:00 INFO 3100 --- [nio-8080-exec-2] reactor.Mono.PeekFuseable.1 : | onNext(Hello Message)
2024-01-30T22:29:33.267+09:00 INFO 3100 --- [nio-8080-exec-2] reactor.Mono.PeekFuseable.1 : | onComplete()
어렵죠 하지만 결과물은 알겠습니다. 메서드 함수 자체가 Mono로 싸져서 들어간다고 해도 이것이 Mono가 Subscribe 될 때 실행되는 것이 아니라 자체적으로 이미 실행을 해버리고 이를 Mono를 구독할 때 넘겨서 작업 해버리네요.
그러면 함수를 Mono 객체 안에서 서로 체이닝으로 같이 실행되게 만들고 싶으면 어떻게 해야 할까요? 아래와 같이 fromSupplier라는 아이템을 가지고 파라미터가 없이 리턴 값만 가지는 오브젝트를 넘기면 퍼블리쉬로 부터 Subscribe 요청이 올때 까지 기다렸다가 Chaining 형태로 실행이 되어져 버립니다.
@GetMapping("/")
Mono<String> hello() {
log.info("pos1");
Mono m = Mono
.fromSupplier(() -> generateHello()) //Parameter는 없고 Return 값만 존재하는 객체
.doOnNext(c -> log.info(c)).log();
log.info("pos2");
return m;
}
2024-01-30T22:32:48.119+09:00 INFO 1708 --- [nio-8080-exec-3] c.t.tester.controller.TesterController : pos1
2024-01-30T22:32:48.119+09:00 INFO 1708 --- [nio-8080-exec-3] c.t.tester.controller.TesterController : pos2
2024-01-30T22:32:48.119+09:00 INFO 1708 --- [nio-8080-exec-3] reactor.Mono.PeekFuseable.2 : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2024-01-30T22:32:48.120+09:00 INFO 1708 --- [nio-8080-exec-3] reactor.Mono.PeekFuseable.2 : | request(unbounded)
2024-01-30T22:32:48.120+09:00 INFO 1708 --- [nio-8080-exec-3] c.t.tester.controller.TesterController : method generateHello()
2024-01-30T22:32:48.120+09:00 INFO 1708 --- [nio-8080-exec-3] c.t.tester.controller.TesterController : Hello Message
2024-01-30T22:32:48.120+09:00 INFO 1708 --- [nio-8080-exec-3] reactor.Mono.PeekFuseable.2 : | onNext(Hello Message)
2024-01-30T22:32:48.120+09:00 INFO 1708 --- [nio-8080-exec-3] reactor.Mono.PeekFuseable.2 : | onComplete()
자, 그러면 m.subscribe() 호출 후 return m;을 하면 어떻게 될까요?
@GetMapping("/")
Mono<String> hello() {
log.info("pos1");
Mono m = Mono
.fromSupplier(() -> generateHello()) //Parameter는 없고 Return 값만 존재하는 객체
.doOnNext(c -> log.info(c)).log();
m.subscribe();
log.info("pos2");
return m;
}
2024-01-30T22:39:39.184+09:00 INFO 11080 --- [nio-8080-exec-1] c.t.tester.controller.TesterController : pos1
2024-01-30T22:39:39.188+09:00 INFO 11080 --- [nio-8080-exec-1] reactor.Mono.PeekFuseable.1 : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2024-01-30T22:39:39.189+09:00 INFO 11080 --- [nio-8080-exec-1] reactor.Mono.PeekFuseable.1 : | request(unbounded)
2024-01-30T22:39:39.189+09:00 INFO 11080 --- [nio-8080-exec-1] c.t.tester.controller.TesterController : method generateHello()
2024-01-30T22:39:39.189+09:00 INFO 11080 --- [nio-8080-exec-1] c.t.tester.controller.TesterController : Hello Message
2024-01-30T22:39:39.189+09:00 INFO 11080 --- [nio-8080-exec-1] reactor.Mono.PeekFuseable.1 : | onNext(Hello Message)
2024-01-30T22:39:39.189+09:00 INFO 11080 --- [nio-8080-exec-1] reactor.Mono.PeekFuseable.1 : | onComplete()
2024-01-30T22:39:39.189+09:00 INFO 11080 --- [nio-8080-exec-1] c.t.tester.controller.TesterController : pos2
2024-01-30T22:39:39.193+09:00 INFO 11080 --- [nio-8080-exec-1] reactor.Mono.PeekFuseable.1 : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2024-01-30T22:39:39.193+09:00 INFO 11080 --- [nio-8080-exec-1] reactor.Mono.PeekFuseable.1 : | request(unbounded)
2024-01-30T22:39:39.193+09:00 INFO 11080 --- [nio-8080-exec-1] c.t.tester.controller.TesterController : method generateHello()
2024-01-30T22:39:39.193+09:00 INFO 11080 --- [nio-8080-exec-1] c.t.tester.controller.TesterController : Hello Message
2024-01-30T22:39:39.193+09:00 INFO 11080 --- [nio-8080-exec-1] reactor.Mono.PeekFuseable.1 : | onNext(Hello Message)
2024-01-30T22:39:39.193+09:00 INFO 11080 --- [nio-8080-exec-1] reactor.Mono.PeekFuseable.1 : | onComplete()
Subscriber가 여러번 구독을 해도 된다는 것을 알 수 있네요... Publisher는 한 번 발행했는데 Subscribe는 여러번 할 수가 있기 때문에 이렇게 한 번 더 Subscribe 하는 시점에서 실행을 한 번 더 하네요.
그리고, 한 번 더 할 수 있는 실수를 알아볼까요?
모노를 정의하고 block이라는 작업을 통해서 저는 String 객체 값만 가져와서 보고 싶은데요. 이를 이렇게 써버리면
아래와 같이 서브스크라이브가 처리가 먼저 block에서 실행이 되어져버리고 return m;을 할 때 실행을 한 번 더 해버립니다.
@GetMapping("/")
Mono<String> hello() {
log.info("pos1");
Mono<String> m = Mono
.fromSupplier(() -> generateHello()) //Parameter는 없고 Return 값만 존재하는 객체
.doOnNext(c -> log.info(c)).log();
String msg2 = m.block();
log.info("pos2 " + msg2);
return m;
}
private String generateHello() {
log.info("method generateHello()");
return "Hello Message";
}
2024-01-30T23:00:01.752+09:00 INFO 3232 --- [nio-8080-exec-1] c.t.tester.controller.TesterController : pos1
2024-01-30T23:00:01.756+09:00 INFO 3232 --- [nio-8080-exec-1] reactor.Mono.PeekFuseable.1 : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2024-01-30T23:00:01.756+09:00 INFO 3232 --- [nio-8080-exec-1] reactor.Mono.PeekFuseable.1 : | request(unbounded)
2024-01-30T23:00:01.757+09:00 INFO 3232 --- [nio-8080-exec-1] c.t.tester.controller.TesterController : method generateHello()
2024-01-30T23:00:01.757+09:00 INFO 3232 --- [nio-8080-exec-1] c.t.tester.controller.TesterController : Hello Message
2024-01-30T23:00:01.757+09:00 INFO 3232 --- [nio-8080-exec-1] reactor.Mono.PeekFuseable.1 : | onNext(Hello Message)
2024-01-30T23:00:01.757+09:00 INFO 3232 --- [nio-8080-exec-1] reactor.Mono.PeekFuseable.1 : | onComplete()
2024-01-30T23:00:01.758+09:00 INFO 3232 --- [nio-8080-exec-1] c.t.tester.controller.TesterController : pos2 Hello Message
2024-01-30T23:00:01.762+09:00 INFO 3232 --- [nio-8080-exec-1] reactor.Mono.PeekFuseable.1 : | onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)
2024-01-30T23:00:01.762+09:00 INFO 3232 --- [nio-8080-exec-1] reactor.Mono.PeekFuseable.1 : | request(unbounded)
2024-01-30T23:00:01.762+09:00 INFO 3232 --- [nio-8080-exec-1] c.t.tester.controller.TesterController : method generateHello()
2024-01-30T23:00:01.762+09:00 INFO 3232 --- [nio-8080-exec-1] c.t.tester.controller.TesterController : Hello Message
2024-01-30T23:00:01.762+09:00 INFO 3232 --- [nio-8080-exec-1] reactor.Mono.PeekFuseable.1 : | onNext(Hello Message)
2024-01-30T23:00:01.762+09:00 INFO 3232 --- [nio-8080-exec-1] reactor.Mono.PeekFuseable.1 : | onComplete()
+) 만일, 리엑티브 스타일로 DB save를 하고 모노는 따로 리턴 안하게 되면 어떻게 될까요?
- 구독을 안해버려서 DB에 저장이 되지 않습니다.
이런 구독에 따라서 달라지는 순서 및 실수 할 수 있는 사항에 대해서 따져 봐야 할 것 같습니다.
Binary 데이터를 받아서 사용 할 때 좋은 모듈 소개
DataBuffer로 Netty에서는 InputStream 객체를 받아서 넘기는데 이를 이용해서 Flux 형태로 쌓인 byte[]으로 변환하고 이를 flatMap으로 펼쳐서 byte[] 그대로 이용해서 검증 또는 실행을 수행합니다.
import com.fasterxml.jackson.databind.ObjectMapper;
import com.test.tester.domain.TestDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.io.buffer.*;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
@Slf4j
@RestController
public class TesterController {
@PostMapping("/test")
public Mono<Void> handleRequest(ServerWebExchange serverWebExchange) {
ServerHttpRequest serverHttpRequest = serverWebExchange.getRequest();
Flux<DataBuffer> body = serverHttpRequest.getBody();
return DataBufferUtils.join(body)
.flatMap(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
return Mono.just(bytes);
})
.flatMap(res -> {
ObjectMapper objectMapper = new ObjectMapper();
try {
TestDTO testDTO = objectMapper.readValue(res, TestDTO.class);
log.info(testDTO.getX() + " " + serverHttpRequest.getHeaders().get("TEST"));
return Mono.empty();
} catch (Exception e) {
return Mono.error(e);
}
});
}
}
[코드 공유 - Binary 모듈]
[코드 공유 - Mono 순서]
https://github.com/1ComputerMaster/1ComputerMaster/tree/main/Study_WebFlux
[함께 보면 좋은 자료]
- Webflux까지 왜 사용을 하게되는지 강의를 통해서 이해 할 수 있습니다.
- Webflux 자체에 대해 깊이 있는 이해를 위해서 필요함을 느껴서 정리된 자료를 살펴보았습니다.
- 결론은 log()가 Blocking 메서드라 오히려 WebMVC 보다 느렸다는 것, 많은 map은 동기 처리 부분의 메서드이며 이 map 메서드 호출은 객체 생성을 계속 사용을 하므로 GC 시간을 늘립니다. 즉, 불필요하게 map으로 지속적으로 늘릴 필요가 없다는 것이죠
- flatMap은 비동기식으로 동작하며 불필요하게 지속적인 호출을 줄인다. 즉, 비동기식으로 실행이 필요한 경우 flatMap으로 사용을 권장합니다. (실제로는 flatMap은 객체를 Mono에서 푸는 역할만 하는데 아마도 발표자 분이 빠르게 설명하면서 객체를 열어서 바로 작성하기 때문에 Map 보다 비용이 싸다는 말씀을 하고 싶었던 거 같습니다. 비동기 식으로 실행이 된다는 설명은 틀린 설명 같습니다.)
[참고한 블로그]
Read request body in Spring Webflux Webfilter
This is one of the interesting problems that I came across recently while developing a feature. I googled it a bit, but none of them worked…