Webflux기본 코드로 달려
0개 또는 1개의 결과를 처리
Mono.just()
Mono<String> monoString = Mono.just("Hello world!");
Iterable객체를 처리
Flux.fromIterable()
List<Integer> numbers = Arrays.asList(1,2,3,4,5);
Flux<Integer> fluxNumbers = Flux.fromIterable(numbers);
비동기 처리후 결과를 Mono로
Mono.fromCallable()
Callable<String> callableTask = () -> "Result from callable task";
Mono<String> monoFromCallable = Mono.fromCallable(callableTask);
여러개의 Flux를 순차적으로 연결
Flux.concat()
Flux<Integer> flux1 = Flux.just(1,2,3);
Flux<Integer> flux2 = Flux.just(4,5,6);
Flux<Integer> concatenatedFlux = Flux.concat(flux1, flux2);
여거래의 Flux를 병합하여 병렬처리
Flux.merge()
Flux<Integer> flux1 = Flux.just(1,3,5);
Flux<Integer> flux2 = Flux.just(2,4,6);
Flux<Integer> mergeFlux = Flux.merge(flux1, flux2);
Mono결합 -> 결과기다림 -> 튜플반환
Mono.zip()
Mono<String> mono1 = Mono.just("Hello");
Mono<String> mono2 = Mono.just("Webflux");
Mono<Tuple2<String, String>> zippedMono = Mono.zip(mono1, mono2);
Flux각 요소에 함수를 적용하여 변환
.map()
Flux<Integer> numbers = Flux.just(1,2,3);
Flux<String> mappedNumbers = numbers.map(num -> "Number: " + num);
Mono의 값을 조건에 따라 필터링함
.filter()
Mono<Integer> numberMono = Mono.just(5);
Mono<Integer> filteredMono = numberMono.filter(num -> num > 3);
Flux의 각 요소에 비동기 함수를 적용후 결과Flux를 병합
.flatMap()
Flux<Integer> numbers = Flux.just(1,2,3);
Flux<Integer> doubledNumbers = numbers.flatMap(num -> Mono.just(num*2));
Flux가 비어있을 때 대체 Flux를 사용
.switchIfEmpty()
Flux<Integer> numbers = Flux.empty();
Flux<Integer> defaultNumbers = Flux.just(1,2,3);
Flux<Integer> result = numbers.switchIfEmpty(defaultNumbers); // result: 1,2,3
Mono에서 에러가 발생 했을때 대체 Mono를 사용
.onErrorResume()
Mono<Integer> numberMono = Mono.just(10)
.map(num -> num / 0)
.onErrorResume(ex -> Mono.just(0));
Flux에서 발생시 조건에 따라 에러무시하고 진행
.onErrorContinue()
Flux<Integer> numbers = Flux.just(1,2,3,4);
Flux<Integer> result = numbers.map(num -> {
if (num == 3) {
throw new RuntimeException("Error!);
}
return num;
})
.onErrorContinue((ex, val) -> {
System.out.println("Error occurred, but continuing with value : " + val);
});
Mono나 Flux에서 각가의 값을 소비하기 전에 사전 처리 수행
.doOnSuccess()
Mono<Integer> numberMono = Mono.just(42);
numberMono = numberMono.doOnSuccess(num -> System.out.println("처리"));
Flux에서 값을 집계하여 단일 결과 Mono로 반환
.reduce()
Flux<Integer> numbers = Flux.just(1,2,3,4,5);
Mono<Integer> sumMono = numbers.reduce(0, (acc,num) -> acc + num);
Flux의 모든 요소를 리스트나 맵으로 수집
.collectMap()
Flux<Integer> numbers = Flux.just(1,2,3,4,5);
Mono<List<Integer>> listMono = numbers.collectList();
Mono<Map<Integer, String>> mapMono = numbers.collectMap(num -> num, num -> "Value : " + num);
주기적으로 값을 생성하는 Flux를 생성
.interval()
Flux<Long> intervalFlux = Flux.interval(Duration.OfSeconds(1));
intervalFlux.subscribe(value -> System.out.println("Received value : " + value));
두 개의 Flux를 조합하여 새로운 Flux를 생성
.zipWith()
Flux<Integer> flux1 = Flux.just(1,2,3);
Flux<Integer> flux2 = Flux.just(10,20,30);
Flux<Tuple2<Integer, Integer>> zippedFlux = flux1.zipWith(flux2);
지정된 개수만큼의 요소를 모아 List형태로 발행
.buffer()
Flux<Integer> numbers = Flux.range(1, 10);
Flux<List<Integer>> bufferedFlux = numbers.buffer(3);
// Output: [1, 2, 3], [4, 5, 6], [7, 8, 9], [10]
지정된 개수만큼의 요소를 가진 작은 Flux들로 구분하여 발행
.window()
Flux<Integer> numbers = Flux.range(1, 10);
Flux<Flux<Integer>> windowedFlux = numbers.window(3);
// Output: Flux[1, 2, 3], Flux[4, 5, 6], Flux[7, 8, 9], Flux[10]
에러가 발생할 경우 재시도
.retry()
Flux<Integer> fluxWithRetry = someFlux.retry(3) // 최대 3번 재시도
Flux각 요소에 비동기 함수 적용, 결과Flux 순차처리
.concatMap()
Flux<Integer> numbers = Flux.just(1,2,3);
Flux<Integer> processedNumbers = numbers.concatMap(num -> Mono.just(num * 2));
Flux각 요소에 비동기 함수 적용, 결과 Flux병렬처리. 순서보장x
.flatMapSequential()
Flux<Integer> numbers = Flux.just(1,2,3);
Flux<Integer> processedNumbers = numbers.flatMapSequential(num -> Mono.just(num * 2));
Mono나 Flux실행완료 후, 다른 Mono나 Flux실행
.then()
Mono<Integer> mono1 = Mono.just(1);
Mono<Integer> mono2 = Mono.just(2);
Mono<Void> voidMono = mono1.then(mono2);
Flux에서 일정시간이내 데이터 못받으면 에러발생
.timeout()
Flux<Integer> numbers = Flux.just(1,2,3).delayElements(Duration.ofSeconds(2));
Flux<Integer> timeoutFlux = numbers.timeout(Duration.ofSeconds(1));
Flux에서 지정한 개수만큼의 요소만 취함
.take()
Flux<Integer> numbers = Flux.just(1,2,3,4,5);
Flux<Integer> limitedFlux = numbers.take(3); // 결과 1,2,3