Webflux기본 코드로 달려
0개 또는 1개의 결과를 처리
Mono.just()
Mono<String> monoString = Mono.just("Hello world!");
Iterable객체를 처리
Flux.fromIterable()
List<Integer> numbers = Arrays.asList(1,2,3,4,5);
Flux<Integer> fluxNumbers = Flux.fromIterable(numbers);
비동기 처리후 결과를 Mono로
Mono.fromCallable()
Callable<String> callableTask = () -> "Result from callable task";
Mono<String> monoFromCallable = Mono.fromCallable(callableTask);
여러개의 Flux를 순차적으로 연결
Flux.concat()
Flux<Integer> flux1 = Flux.just(1,2,3);
Flux<Integer> flux2 = Flux.just(4,5,6);
Flux<Integer> concatenatedFlux = Flux.concat(flux1, flux2);
여거래의 Flux를 병합하여 병렬처리
Flux.merge()
Flux<Integer> flux1 = Flux.just(1,3,5);
Flux<Integer> flux2 = Flux.just(2,4,6);
Flux<Integer> mergeFlux = Flux.merge(flux1, flux2);
Mono결합 -> 결과기다림 -> 튜플반환
Mono.zip()
Mono<String> mono1 = Mono.just("Hello");
Mono<String> mono2 = Mono.just("Webflux");
Mono<Tuple2<String, String>> zippedMono = Mono.zip(mono1, mono2);
Flux각 요소에 함수를 적용하여 변환
.map()
Flux<Integer> numbers = Flux.just(1,2,3);
Flux<String> mappedNumbers = numbers.map(num -> "Number: " + num);
Mono의 값을 조건에 따라 필터링함
.filter()
Mono<Integer> numberMono = Mono.just(5);
Mono<Integer> filteredMono = numberMono.filter(num -> num > 3);
Flux의 각 요소에 비동기 함수를 적용후 결과Flux를 병합
.flatMap()
Flux<Integer> numbers = Flux.just(1,2,3);
Flux<Integer> doubledNumbers = numbers.flatMap(num -> Mono.just(num*2));
Flux가 비어있을 때 대체 Flux를 사용
.switchIfEmpty()
Flux<Integer> numbers = Flux.empty();
Flux<Integer> defaultNumbers = Flux.just(1,2,3);
Flux<Integer> result = numbers.switchIfEmpty(defaultNumbers); // result: 1,2,3
Mono에서 에러가 발생 했을때 대체 Mono를 사용
.onErrorResume()
Mono<Integer> numberMono = Mono.just(10)
.map(num -> num / 0)
.onErrorResume(ex -> Mono.just(0));
Flux에서 발생시 조건에 따라 에러무시하고 진행
.onErrorContinue()
Flux<Integer> numbers = Flux.just(1,2,3,4);
Flux<Integer> result = numbers.map(num -> {
if (num == 3) {
throw new RuntimeException("Error!);
}
return num;
})
.onErrorContinue((ex, val) -> {
System.out.println("Error occurred, but continuing with value : " + val);
});
Mono나 Flux에서 각가의 값을 소비하기 전에 사전 처리 수행
.doOnSuccess()
Mono<Integer> numberMono = Mono.just(42);
numberMono = numberMono.doOnSuccess(num -> System.out.println("처리"));
Flux에서 값을 집계하여 단일 결과 Mono로 반환
.reduce()
Flux<Integer> numbers = Flux.just(1,2,3,4,5);
Mono<Integer> sumMono = numbers.reduce(0, (acc,num) -> acc + num);
Flux의 모든 요소를 리스트나 맵으로 수집
.collectMap()
Flux<Integer> numbers = Flux.just(1,2,3,4,5);
Mono<List<Integer>> listMono = numbers.collectList();
Mono<Map<Integer, String>> mapMono = numbers.collectMap(num -> num, num -> "Value : " + num);
주기적으로 값을 생성하는 Flux를 생성
.interval()
Flux<Long> intervalFlux = Flux.interval(Duration.OfSeconds(1));
intervalFlux.subscribe(value -> System.out.println("Received value : " + value));
두 개의 Flux를 조합하여 새로운 Flux를 생성
.zipWith()
Flux<Integer> flux1 = Flux.just(1,2,3);
Flux<Integer> flux2 = Flux.just(10,20,30);
Flux<Tuple2<Integer, Integer>> zippedFlux = flux1.zipWith(flux2);
지정된 개수만큼의 요소를 모아 List형태로 발행
.buffer()
Flux<Integer> numbers = Flux.range(1, 10);
Flux<List<Integer>> bufferedFlux = numbers.buffer(3);
// Output: [1, 2, 3], [4, 5, 6], [7, 8, 9], [10]
지정된 개수만큼의 요소를 가진 작은 Flux들로 구분하여 발행
.window()
Flux<Integer> numbers = Flux.range(1, 10);
Flux<Flux<Integer>> windowedFlux = numbers.window(3);
// Output: Flux[1, 2, 3], Flux[4, 5, 6], Flux[7, 8, 9], Flux[10]
에러가 발생할 경우 재시도
.retry()
Flux<Integer> fluxWithRetry = someFlux.retry(3) // 최대 3번 재시도
Flux각 요소에 비동기 함수 적용, 결과Flux 순차처리
.concatMap()
Flux<Integer> numbers = Flux.just(1,2,3);
Flux<Integer> processedNumbers = numbers.concatMap(num -> Mono.just(num * 2));
Flux각 요소에 비동기 함수 적용, 결과 Flux병렬처리. 순서보장x
.flatMapSequential()
Flux<Integer> numbers = Flux.just(1,2,3);
Flux<Integer> processedNumbers = numbers.flatMapSequential(num -> Mono.just(num * 2));
Mono나 Flux실행완료 후, 다른 Mono나 Flux실행
.then()
Mono<Integer> mono1 = Mono.just(1);
Mono<Integer> mono2 = Mono.just(2);
Mono<Void> voidMono = mono1.then(mono2);
Flux에서 일정시간이내 데이터 못받으면 에러발생
.timeout()
Flux<Integer> numbers = Flux.just(1,2,3).delayElements(Duration.ofSeconds(2));
Flux<Integer> timeoutFlux = numbers.timeout(Duration.ofSeconds(1));
Flux에서 지정한 개수만큼의 요소만 취함
.take()
Flux<Integer> numbers = Flux.just(1,2,3,4,5);
Flux<Integer> limitedFlux = numbers.take(3); // 결과 1,2,3
Flux에서 지정한 개수만큼 요소를 건너뜀
.skip()
Flux<Integer> numbers = Flux.just(1,2,3,4,5);
Flux<Integer> skippedFlux = numbers.skip(2); // 결과 3,4,5
중복된 요소를 제거
.distinct()
Flux<Integer> numbers = Flux.just(1,2,3,2,4,3,5);
Flux<Integer> distinctFlux = numbers.distinct(); // 결과 1,2,3,4,5
비동기 조건으로 Flux를 필터링
.filterWhen()
Flux<Integer> numbers = Flux.just(1,2,3,4,5);
Flux<Integer> filteredFlux = numbers.filterWhen(num -> Mono.just(num % 2 == 0)); // 결과 2,3
Flux가 비어있을 때 기본값을 발행
.defaultIfEmpty()
Flux<Integer> emptyFlux = Flux.empty();
Flux<Integer> defaultFlux = emptyFlux.defaultIfEmpty(0); // 결과 0
두개의 Flux를 합침
.mergeWith()
Flux<Integer> flux1 = Flux.just(1,2,3);
Flux<Integer> flux1 = Flux.just(4,5,6);
Flux<Integer> mergedFlux = flux1.mergeWith(flux2) // 결과 1,2,3,4,5,6
에러를 다른형태로 변환
.map()
Flux<Integer> numbers = Flux.just(1,2,3);
Flux<Integer> processedNumbers = numbers.map(num -> num / 0)
.onErrorMap(ArithmeticException.class, ex -> new RuntimeException("Division by zero!));
Flux와 Iterable객체를 조합하여 Tuple형태로 결합
.zipWithIterable()
Flux<Integer> flux = Flux.just(1,2,3);
List<String> names = Arrays.asList("Alice", "Bob", "Charlie");
Flux<Tuple2<Integer, String>> zippedFlux = flux.zipWithIterable(names);
Flux의 모든 요소를 정렬된 리스트로 수집
.collectSortedList()
Flux<Integer> numbers = Flux.just(3,1,4,2,5);
Mono<List<Integer>> sortedList = numbers.collectSortedList();
새로운 값을 생성하고 최신 결과만 유지
.switchMap()
Flux<String> words = Flux.just("apple", "banana", "cherry");
Flux<Integer> switchedFlux = words.switchMap(word -> Mono.just(word.length()));
// 결과 5, 6, 6
병렬로 실행된 결과를 보장하면서 Flux요소를 처리
.flatMapSequential()
Flux<Integer> flux = Flux.just(1,2,3);
Flux<Integer> processedFlux = flux.flatMapSequential(num -> Flux.just(num * 2).delayElements(Duration.ofMillis(num * 100)));
// 결과 2, 4, 6 (딜레이에 의해 병렬로 실행되어도 순서를 보장)
Mono의 결과를 캐싱하여 다시 사용할 수 있도록 한다
.cache()
Mono<String> data = Mono.fromCallable(() -> expensiveDatabaseCall());
Mono<String> cachedData = data.cache();
여러 Flux에서 최신 값들을 조합하여 새로운 결과를 발행
.combineLatest
Flux<Integer> flux1 = Flux.just(1,2,3);
Flux<Integer> flux2 = Flux.just(10,20,30);
Flux<Integer> combinedFlux = Flux.combineLatest(flux1, flux2, (num1, num2) -> num1 + num2);
// 결과 13, 23, 33 (최신 값임 3과 30을 조합, 1+30, 2+30, 3+30)
여러 Flux중 가장 먼저 도착한 Flux를 선택
.first()
Flux<Integer> flux1 = Flux.just(1,2,3).delayElements(Duration.ofSeconds(2));
Flux<Integer> flux2 = Flux.just(10,20,30).delayElements(Duration.ofSeconds(1));
Flux<Integer> firstFlux = Flux.first(flux1, flux2);
// 결과 10, 20, 30 (flux2가 먼저도착)
지정된 주기로 가장 최근의 값만 선택
.sample()
Flux<Integer> numbers = Flux.range(1,10).delayElements(Duration.ofMillis(50));
Flux<Integer> sampledFlux = numbers.sample(Duration.ofSeconds(2));
// 결과 2, 5, 8 (2초마다 최근 값만 선책))
Flux의 요소를 비동기적으로 처리하고 결과를 Iterable형태로 반환
.concatMapIterable()
Flux<Integer> flux = Flux.just(1,2,3);
Flux<String> processedFlux = flux.concatMapIterable(num -> Arrays.asList("A" + num, "B" + num));
// 결과 : A1, B1, A2, B2, A3, B3
에러가 발생할 경우 지정된 조건에 따라 재시도
.retryWhen()
Flux<Integer> fluxWithRetry = someFlux.retryWhen(errors -> errors.zipWith(Flux.range(1, 3), (error, retryCount) -> {
if (retryCount < 3) {
return retryCount;
}
return error;
}).flatMap(errorOrRetryCount -> {
if (errorOrRetryCount instanceof Throwable) {
return Mono.error((Throwable) errorOrRetryCount);
}
return Mono.delay(Duration.ofSeconds(1));
}));
Flux 각요소에 대해 콜백을 제공하여 변환 및 필터링
.handle()
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
Flux<String> processedNumbers = numbers.handle((num, sink) -> {
if (num % 2 == 0) {
sink.next("Even: " + num);
}
});
Mono의 값을 비동기 조건으로 필터링
.filterWhen()
Mono<Integer> numberMono = Mono.just(5);
Mono<Integer> filteredMono = numberMono.filterWhen(num -> Mono.just(num > 3));
첫번째 요소에대해 특별로직 수행, 나머지Flux반환
.switchOnFirst()
Flux<Integer> numbers = Flux.just(1, 2, 3);
Flux<Integer> processedFlux = numbers.switchOnFirst((signalType, value) -> {
if (signalType == SignalType.ON_NEXT) {
System.out.println("First element: " + value.get());
}
return value;
});
Flux가 완료되거나 에러가 발생했을 때 특정 동작실행
.doFinally()
Flux<Integer> numbers = Flux.just(1, 2, 3);
numbers = numbers.doFinally(signalType -> {
if (signalType == SignalType.ON_COMPLETE) {
System.out.println("Flux completed.");
}
});
Flux를 반복해서 재생
.repeat()
Flux<Integer> numbers = Flux.just(1, 2, 3);
Flux<Integer> repeatedFlux = numbers.repeat(2); // 2번 반복
Mono의 값을 무시하고 완료 신호만 유지
.ignoreElement()
Mono<Integer> numberMono = Mono.just(42);
Mono<Void> completionMono = numberMono.ignoreElement();
초기값과 함께 Flux의 요소를 집계하고 결과를 차례로 발행
.scan()
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
Flux<Integer> accumulatedFlux = numbers.scan((acc, num) -> acc + num);
// 결과: 1, 3, 6, 10, 15
Flux의 각 요소를 비동기처리후 결과를 Iterable형태로 변환
.concatMapIterable()
Flux<Integer> flux = Flux.just(1, 2, 3);
Flux<String> processedFlux = flux.concatMapIterable(num -> Arrays.asList("A" + num, "B" + num));
// 결과: A1, B1, A2, B2, A3, B3
기준에 따라 Flux요소를 그룹핑
.groupBy()
Flux<String> animals = Flux.just("cat", "dog", "cow", "elephant", "camel");
Flux<GroupedFlux<Character, String>> groupedFluxes = animals.groupBy(animal -> animal.charAt(0));
재시도 로직을 사용하고 에러를 반환
.retryWhen()
Flux<Integer> fluxWithRetryAndErrorHandling = someFlux.retryWhen(errors -> errors.zipWith(Flux.range(1, 3), (error, retryCount) -> {
if (retryCount < 3) {
return retryCount;
}
return error;
}).flatMap(errorOrRetryCount -> {
if (errorOrRetryCount instanceof Throwable) {
return Mono.error((Throwable) errorOrRetryCount);
}
return Mono.delay(Duration.ofSeconds(1));
})).onErrorMap(IllegalStateException.class, ex -> new CustomException("Custom error occurred.", ex));
Flux와 Mono를 연결하고 모든 작업이 완료될 때까지 기다림
.concatWith()
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Mono<Integer> mono1 = Mono.just(7);
Mono<Integer> mono2 = Mono.just(8);
Flux<Integer> concatenatedFlux = flux1.concatWith(flux2).concatWith(Mono.when(mono1, mono2));
에러가 발생할 경우 대체 Flux로 처리
.onErrorResume()
Flux<Integer> fluxWithFallback = someFlux.map(num -> num / 0)
.onErrorResume(ex -> Flux.range(1, 5)); // 에러가 발생하면 대체 Flux로 처리
특정 조건에 따라 Flux의 요소를 버퍼링
.bufferUntil() / .bufferWhile()
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9);
Flux<List<Integer>> bufferedUntilFlux = numbers.bufferUntil(num -> num % 3 == 0); // 3의 배수일 때 버퍼링
Flux<List<Integer>> bufferedWhileFlux = numbers.bufferWhile(num -> num % 2 == 0); // 짝수일 때 버퍼링
Flux의 데이터 생성과 구독이 다른스레드에서 수행
어렵..
Flux<Integer> flux = Flux.range(1, 5)
.map(num -> {
System.out.println("Mapping on thread: " + Thread.currentThread().getName());
return num * 2;
})
.publishOn(Schedulers.parallel()) // 데이터 생성과 관련된 작업을 병렬 스레드로 수행
.map(num -> {
System.out.println("Mapping again on thread: " + Thread.currentThread().getName());
return num + 1;
})
.subscribeOn(Schedulers.single()); // 구독과 관련된 작업을 단일 스레드로 수행
flux.subscribe(System.out::println);
Backpressure를 처리
.onBackpressureBuffer()
Flux<Integer> sourceFlux = Flux.range(1, 1000);
Flux<Integer> bufferedFlux = sourceFlux.onBackpressureBuffer(100); // 최대 100개의 요소만 버퍼링하도록 설정
동적으로 요청하고 응답을 처리
Flux<String> requests = Flux.just("request-1", "request-2", "request-3");
Flux<String> processedResponses = requests.switchMap(request -> {
// request를 기반으로 비동기 작업 수행 (예: 외부 API 호출)
return WebClient.create().get()
.uri("https://api.example.com/data?query=" + request)
.retrieve()
.bodyToMono(String.class);
});
Flux의 최신 값을 조합
.combineLatest()
Flux<Long> interval1 = Flux.interval(Duration.ofSeconds(1));
Flux<Long> interval2 = Flux.interval(Duration.ofMillis(500));
Flux<Tuple2<Long, Long>> combinedFlux = Flux.combineLatest(interval1, interval2, Tuple2::of);
// 두 Flux 중 하나라도 값이 갱신되면 최신 값을 조합하여 Tuple을 발행합니다.
Flux<Tuple2<Long, Long>> withLatestFromFlux = interval1.withLatestFrom(interval2, Tuple2::of);
// 첫 번째 Flux의 값이 갱신될 때마다 두 번째 Flux의 최신 값을 조합하여 Tuple을 발행합니다.
반복을 컨트롤
어렵..
AtomicInteger counter = new AtomicInteger();
Flux<String> dataFlux = Flux.defer(() -> {
int count = counter.getAndIncrement();
return count < 5 ? Flux.just("Data-" + count) : Flux.empty();
});
dataFlux.repeatWhen(companion -> companion.take(3).delayElements(Duration.ofSeconds(2)))
.subscribe(System.out::println);
구독자가 동일한 시퀀스를 공유
어렵다..
Flux<Integer> sourceFlux = Flux.range(1, 5)
.doOnSubscribe(subscription -> System.out.println("New subscriber connected"));
ConnectableFlux<Integer> connectableFlux = sourceFlux.publish();
connectableFlux.subscribe(num -> System.out.println("Subscriber 1: " + num));
connectableFlux.subscribe(num -> System.out.println("Subscriber 2: " + num));
connectableFlux.connect();
Flux의 중복 로직을 재사용
.compose()
Function<Flux<String>, Flux<String>> toUpperCase = stringFlux -> stringFlux.map(String::toUpperCase);
Function<Flux<String>, Flux<String>> trimWhitespace = stringFlux -> stringFlux.map(String::trim);
Flux<String> dataFlux = Flux.just(" hello ", " world ");
dataFlux.compose(trimWhitespace).compose(toUpperCase).subscribe(System.out::println);
Flux의 각 요소 사의 시간 간격을 측정, 새로운 Flux생성
.elapsed()
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
Flux<Tuple2<Integer, Long>> elapsedFlux = numbers
.elapsed()
.doOnNext(tuple -> System.out.println("Elapsed: " + tuple.getT2() + "ms"));
두개의 Flux중 하나가 요소를 발행하면 다른 Flux취소
.takeUntilOther()
Flux<Integer> sourceFlux = Flux.just(1, 2, 3, 4, 5);
Flux<Long> durationFlux = Flux.interval(Duration.ofSeconds(3));
Flux<Integer> takeUntilOtherFlux = sourceFlux.takeUntilOther(durationFlux);
takeUntilOtherFlux.subscribe(System.out::println);
지정범위 내에서 순차적인 숫자 발행
.range()
Flux<Integer> rangeFlux = Flux.range(1, 5);
rangeFlux.subscribe(System.out::println);
에러발생시 대체값
.onErrorReturn()
Flux<Integer> numbers = Flux.just(1, 2, 3);
Flux<Integer> fluxWithFallback = numbers.map(num -> num / 0)
.onErrorReturn(0); // 에러가 발생하면 대체 값 0으로 처리
지정된 시간마다 새로운 Flux생성
.windowTimeout()
Flux<Integer> numbers = Flux.range(1, 10);
Flux<Flux<Integer>> windowedFlux = numbers.windowTimeout(3, Duration.ofSeconds(2));
windowedFlux.subscribe(window -> window.collectList().subscribe(System.out::println));
두개의 Flux를 병합
.zip()
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<String> flux2 = Flux.just("A", "B", "C");
Flux<Tuple2<Integer, String>> zippedFlux = Flux.zip(flux1, flux2);
zippedFlux.subscribe(tuple -> System.out.println(tuple.getT1() + tuple.getT2()));
Flux중 가장 먼저 도착한 Flux를 선택
.firstEmitting()
Flux<Integer> flux1 = Flux.just(1, 2, 3).delayElements(Duration.ofSeconds(2));
Flux<Integer> flux2 = Flux.just(10, 20, 30).delayElements(Duration.ofSeconds(1));
Flux<Integer> firstFlux = Flux.firstEmitting(flux1, flux2);
firstFlux.subscribe(System.out::println);
Flux에서 단 하나의 요소를 가져옴
.single()
Flux<Integer> numbers = Flux.just(42);
numbers.single()
.doOnSuccess(num -> System.out.println("Single value: " + num))
.doOnError(ex -> System.out.println("Error: " + ex.getMessage()))
.subscribe();
Flux의 요소를 하나의 결과로 줄임
.reduce()
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
Mono<Integer> sum = numbers.reduce((x, y) -> x + y);
sum.subscribe(result -> System.out.println("Sum: " + result));
여러 Flux를 합침
.merge()
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Flux<Integer> mergedFlux = Flux.merge(flux1, flux2);
mergedFlux.subscribe(System.out::println);
Flux<Integer> concatenatedFlux = Flux.concat(flux1, flux2);
concatenatedFlux.subscribe(System.out::println);
Flux에서 조건에 만족하는 요소를 확인
.any()
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
numbers.any(num -> num > 3)
.subscribe(result -> System.out.println("Any number greater than 3: " + result));
Flux에서 모든 요소가 조건에 만족하는지 확인
.all()
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
numbers.all(num -> num > 0)
.subscribe(result -> System.out.println("All numbers are greater than 0: " + result));
Flux의 모든 요소를 리스트로 수집
.collectList()
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
numbers.collectList()
.subscribe(list -> System.out.println("List of numbers: " + list));
Flux의 요소를 키-값 쌍으로 수집하여 Map으로 반환
.collectMap()
Flux<String> animals = Flux.just("cat", "dog", "elephant", "giraffe");
animals.collectMap(animal -> animal.charAt(0))
.subscribe(map -> System.out.println("Animals map: " + map));
초기값과 함께 Flux의 요소를 집계하고 결과를 발행
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
numbers.scanWith(() -> 0, (acc, num) -> acc + num)
.subscribe(result -> System.out.println("Accumulated sum: " + result));
주어진 시간 간격으로 최근에 발행된 Flux의 요소를 샘플링
.sample()
Flux.interval(Duration.ofMillis(200))
.sample(Duration.ofSeconds(1))
.take(5)
.subscribe(System.out::println);
Flux가 완료되었을 때 지정된 조건에 따라 반복
.repeatWhenEmpty()
Flux<Integer> flux = Flux.just(1, 2, 3).concatWith(Flux.empty());
flux.repeatWhenEmpty(companion -> companion.take(2)).subscribe(System.out::println);
에러가 발생하더라도 모든 Flux를 처리
.concatMapDelayError
Flux<Integer> numbers = Flux.just(1, 2, 3);
Flux<Integer> processedFlux = numbers.concatMapDelayError(num -> {
if (num == 2) {
return Mono.error(new RuntimeException("Error processing number 2"));
}
return Mono.just(num * 2);
}, 1); // 1로 설정하여 에러 발생 시 해당 에러만 처리하고 나머지 Flux는 에러 발생 이후에 처리
지정된 시간만큼 구독을 지연
.delaySubscription()
Flux<Integer> numbers = Flux.range(1, 5);
numbers.delaySubscription(Duration.ofSeconds(3)).subscribe(System.out::println);
Flux각 요소를 비동기처리후 결과 Iterable형태로 반환
.concatMapIterable()
Flux<Integer> flux = Flux.just(1, 2, 3);
Flux<String> processedFlux = flux.concatMapIterable(num -> Arrays.asList("A" + num, "B" + num));
지정된 조건에 따라 Flux를 반복
.repeatWhen()
Flux<Integer> flux = Flux.range(1, 3);
Flux<Integer> repeatedFlux = flux.repeatWhen(companion -> companion.take(2));
각 요청마다 새로운 Flux를 생성하여 이전요청 취소
.switchMap()
Flux<String> requests = Flux.just("request-1", "request-2", "request-3");
Flux<String> processedResponses = requests.switchMap(request -> {
// request를 기반으로 비동기 작업 수행 (예: 외부 API 호출)
return WebClient.create().get()
.uri("https://api.example.com/data?query=" + request)
.retrieve()
.bodyToMono(String.class);
});
배압 처리
.onBackpressureBuffer()
Flux<Integer> sourceFlux = Flux.range(1, 1000);
Flux<Integer> bufferedFlux = sourceFlux.onBackpressureBuffer(100); // 최대 100개의 요소만 버퍼링하도록 설정
첫번째 요소에 대해 로직 수행후, 나머지 Flux반환
.switchOnFirst()
Flux<Integer> numbers = Flux.just(1, 2, 3);
Flux<Integer> processedFlux = numbers.switchOnFirst((signalType, value) -> {
if (signalType == SignalType.ON_NEXT) {
System.out.println("First element: " + value.get());
}
return value;
});
Flux각 요소에 콜백을 제공하여 변환 또는 필터링
.handle()
Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
Flux<String> processedNumbers = numbers.handle((num, sink) -> {
if (num % 2 == 0) {
sink.next("Even: " + num);
}
});
Flux가 완료되거나 에러발생시 특정 동작 실행
.doFinally()
Flux<Integer> numbers = Flux.just(1, 2, 3);
numbers = numbers.doFinally(signalType -> {
if (signalType == SignalType.ON_COMPLETE) {
System.out.println("Flux completed.");
}
});
Flux와 Mono를 연결하고 모든 작업이 완료될때까지 기다림
.concatWith()
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Mono<Integer> mono1 = Mono.just(7);
Mono<Integer> mono2 = Mono.just(8);
Flux<Integer> concatenatedFlux = flux1.concatWith(flux2).concatWith(Mono.when(mono1, mono2));
특정 기준에 따라 Flux의 요소를 그룹핑
.groupBy()
Flux<String> animals = Flux.just("cat", "dog", "cow", "elephant", "camel");
Flux<GroupedFlux<Character, String>> groupedFluxes = animals.groupBy(animal -> animal.charAt(0));