メインコンテンツまでスキップ

Webflux기본 코드로 달려

0개 또는 1개의 결과를 처리

Mono.just()

Mono<String> monoString = Mono.just("Hello world!");

Iterable객체를 처리

Flux.fromIterable()

List<Integer> numbers = Arrays.asList(1,2,3,4,5);
Flux<Integer> fluxNumbers = Flux.fromIterable(numbers);

비동기 처리후 결과를 Mono로

Mono.fromCallable()

Callable<String> callableTask = () -> "Result from callable task";
Mono<String> monoFromCallable = Mono.fromCallable(callableTask);

여러개의 Flux를 순차적으로 연결

Flux.concat()

Flux<Integer> flux1 = Flux.just(1,2,3);
Flux<Integer> flux2 = Flux.just(4,5,6);
Flux<Integer> concatenatedFlux = Flux.concat(flux1, flux2);

여거래의 Flux를 병합하여 병렬처리

Flux.merge()

Flux<Integer> flux1 = Flux.just(1,3,5);
Flux<Integer> flux2 = Flux.just(2,4,6);
Flux<Integer> mergeFlux = Flux.merge(flux1, flux2);

Mono결합 -> 결과기다림 -> 튜플반환

Mono.zip()

Mono<String> mono1 = Mono.just("Hello");
Mono<String> mono2 = Mono.just("Webflux");
Mono<Tuple2<String, String>> zippedMono = Mono.zip(mono1, mono2);

Flux각 요소에 함수를 적용하여 변환

.map()

Flux<Integer> numbers = Flux.just(1,2,3);
Flux<String> mappedNumbers = numbers.map(num -> "Number: " + num);

Mono의 값을 조건에 따라 필터링함

.filter()

Mono<Integer> numberMono = Mono.just(5);
Mono<Integer> filteredMono = numberMono.filter(num -> num > 3);

Flux의 각 요소에 비동기 함수를 적용후 결과Flux를 병합

.flatMap()

Flux<Integer> numbers = Flux.just(1,2,3);
Flux<Integer> doubledNumbers = numbers.flatMap(num -> Mono.just(num*2));

Flux가 비어있을 때 대체 Flux를 사용

.switchIfEmpty()

Flux<Integer> numbers = Flux.empty();
Flux<Integer> defaultNumbers = Flux.just(1,2,3);
Flux<Integer> result = numbers.switchIfEmpty(defaultNumbers); // result: 1,2,3

Mono에서 에러가 발생 했을때 대체 Mono를 사용

.onErrorResume()

Mono<Integer> numberMono = Mono.just(10)
.map(num -> num / 0)
.onErrorResume(ex -> Mono.just(0));

Flux에서 발생시 조건에 따라 에러무시하고 진행

.onErrorContinue()

Flux<Integer> numbers = Flux.just(1,2,3,4);
Flux<Integer> result = numbers.map(num -> {
if (num == 3) {
throw new RuntimeException("Error!);
}
return num;
})
.onErrorContinue((ex, val) -> {
System.out.println("Error occurred, but continuing with value : " + val);
});

Mono나 Flux에서 각가의 값을 소비하기 전에 사전 처리 수행

.doOnSuccess()

Mono<Integer> numberMono = Mono.just(42);
numberMono = numberMono.doOnSuccess(num -> System.out.println("처리"));

Flux에서 값을 집계하여 단일 결과 Mono로 반환

.reduce()

Flux<Integer> numbers = Flux.just(1,2,3,4,5);
Mono<Integer> sumMono = numbers.reduce(0, (acc,num) -> acc + num);

Flux의 모든 요소를 리스트나 맵으로 수집

.collectMap()

Flux<Integer> numbers = Flux.just(1,2,3,4,5);
Mono<List<Integer>> listMono = numbers.collectList();
Mono<Map<Integer, String>> mapMono = numbers.collectMap(num -> num, num -> "Value : " + num);

주기적으로 값을 생성하는 Flux를 생성

.interval()

Flux<Long> intervalFlux = Flux.interval(Duration.OfSeconds(1));
intervalFlux.subscribe(value -> System.out.println("Received value : " + value));

두 개의 Flux를 조합하여 새로운 Flux를 생성

.zipWith()

Flux<Integer> flux1 = Flux.just(1,2,3);
Flux<Integer> flux2 = Flux.just(10,20,30);
Flux<Tuple2<Integer, Integer>> zippedFlux = flux1.zipWith(flux2);

지정된 개수만큼의 요소를 모아 List형태로 발행

.buffer()

Flux<Integer> numbers = Flux.range(1, 10);
Flux<List<Integer>> bufferedFlux = numbers.buffer(3);
// Output: [1, 2, 3], [4, 5, 6], [7, 8, 9], [10]

지정된 개수만큼의 요소를 가진 작은 Flux들로 구분하여 발행

.window()

Flux<Integer> numbers = Flux.range(1, 10);
Flux<Flux<Integer>> windowedFlux = numbers.window(3);
// Output: Flux[1, 2, 3], Flux[4, 5, 6], Flux[7, 8, 9], Flux[10]

에러가 발생할 경우 재시도

.retry()

Flux<Integer> fluxWithRetry = someFlux.retry(3) // 최대 3번 재시도

Flux각 요소에 비동기 함수 적용, 결과Flux 순차처리

.concatMap()

Flux<Integer> numbers = Flux.just(1,2,3);
Flux<Integer> processedNumbers = numbers.concatMap(num -> Mono.just(num * 2));

Flux각 요소에 비동기 함수 적용, 결과 Flux병렬처리. 순서보장x

.flatMapSequential()

Flux<Integer> numbers = Flux.just(1,2,3);
Flux<Integer> processedNumbers = numbers.flatMapSequential(num -> Mono.just(num * 2));

Mono나 Flux실행완료 후, 다른 Mono나 Flux실행

.then()

Mono<Integer> mono1 = Mono.just(1);
Mono<Integer> mono2 = Mono.just(2);
Mono<Void> voidMono = mono1.then(mono2);

Flux에서 일정시간이내 데이터 못받으면 에러발생

.timeout()

Flux<Integer> numbers = Flux.just(1,2,3).delayElements(Duration.ofSeconds(2));
Flux<Integer> timeoutFlux = numbers.timeout(Duration.ofSeconds(1));

Flux에서 지정한 개수만큼의 요소만 취함

.take()

Flux<Integer> numbers = Flux.just(1,2,3,4,5);
Flux<Integer> limitedFlux = numbers.take(3); // 결과 1,2,3

Flux에서 지정한 개수만큼 요소를 건너뜀

.skip()

Flux<Integer> numbers = Flux.just(1,2,3,4,5);
Flux<Integer> skippedFlux = numbers.skip(2); // 결과 3,4,5

중복된 요소를 제거

.distinct()

Flux<Integer> numbers = Flux.just(1,2,3,2,4,3,5);
Flux<Integer> distinctFlux = numbers.distinct(); // 결과 1,2,3,4,5

비동기 조건으로 Flux를 필터링

.filterWhen()

Flux<Integer> numbers = Flux.just(1,2,3,4,5);
Flux<Integer> filteredFlux = numbers.filterWhen(num -> Mono.just(num % 2 == 0)); // 결과 2,3

Flux가 비어있을 때 기본값을 발행

.defaultIfEmpty()

Flux<Integer> emptyFlux = Flux.empty();
Flux<Integer> defaultFlux = emptyFlux.defaultIfEmpty(0); // 결과 0

두개의 Flux를 합침

.mergeWith()

Flux<Integer> flux1 = Flux.just(1,2,3);
Flux<Integer> flux1 = Flux.just(4,5,6);
Flux<Integer> mergedFlux = flux1.mergeWith(flux2) // 결과 1,2,3,4,5,6

에러를 다른형태로 변환

.map()

Flux<Integer> numbers = Flux.just(1,2,3);
Flux<Integer> processedNumbers = numbers.map(num -> num / 0)
.onErrorMap(ArithmeticException.class, ex -> new RuntimeException("Division by zero!));

Flux와 Iterable객체를 조합하여 Tuple형태로 결합

.zipWithIterable()

Flux<Integer> flux = Flux.just(1,2,3);
List<String> names = Arrays.asList("Alice", "Bob", "Charlie");
Flux<Tuple2<Integer, String>> zippedFlux = flux.zipWithIterable(names);

Flux의 모든 요소를 정렬된 리스트로 수집

.collectSortedList()

Flux<Integer> numbers = Flux.just(3,1,4,2,5);
Mono<List<Integer>> sortedList = numbers.collectSortedList();

새로운 값을 생성하고 최신 결과만 유지

.switchMap()

Flux<String> words = Flux.just("apple", "banana", "cherry");
Flux<Integer> switchedFlux = words.switchMap(word -> Mono.just(word.length()));
// 결과 5, 6, 6

병렬로 실행된 결과를 보장하면서 Flux요소를 처리

.flatMapSequential()

Flux<Integer> flux = Flux.just(1,2,3);
Flux<Integer> processedFlux = flux.flatMapSequential(num -> Flux.just(num * 2).delayElements(Duration.ofMillis(num * 100)));
// 결과 2, 4, 6 (딜레이에 의해 병렬로 실행되어도 순서를 보장)

Mono의 결과를 캐싱하여 다시 사용할 수 있도록 한다

.cache()

Mono<String> data = Mono.fromCallable(() -> expensiveDatabaseCall());
Mono<String> cachedData = data.cache();

여러 Flux에서 최신 값들을 조합하여 새로운 결과를 발행

.combineLatest

Flux<Integer> flux1 = Flux.just(1,2,3);
Flux<Integer> flux2 = Flux.just(10,20,30);
Flux<Integer> combinedFlux = Flux.combineLatest(flux1, flux2, (num1, num2) -> num1 + num2);
// 결과 13, 23, 33 (최신 값임 3과 30을 조합, 1+30, 2+30, 3+30)

여러 Flux중 가장 먼저 도착한 Flux를 선택

.first()

Flux<Integer> flux1 = Flux.just(1,2,3).delayElements(Duration.ofSeconds(2));
Flux<Integer> flux2 = Flux.just(10,20,30).delayElements(Duration.ofSeconds(1));
Flux<Integer> firstFlux = Flux.first(flux1, flux2);
// 결과 10, 20, 30 (flux2가 먼저도착)

지정된 주기로 가장 최근의 값만 선택

.sample()

Flux<Integer> numbers = Flux.range(1,10).delayElements(Duration.ofMillis(50));
Flux<Integer> sampledFlux = numbers.sample(Duration.ofSeconds(2));
// 결과 2, 5, 8 (2초마다 최근 값만 선책))

Flux의 요소를 비동기적으로 처리하고 결과를 Iterable형태로 반환

.concatMapIterable()

Flux<Integer> flux = Flux.just(1,2,3);
Flux<String> processedFlux = flux.concatMapIterable(num -> Arrays.asList("A" + num, "B" + num));
// 결과 : A1, B1, A2, B2, A3, B3

에러가 발생할 경우 지정된 조건에 따라 재시도

.retryWhen()

Flux<Integer> fluxWithRetry = someFlux.retryWhen(errors -> errors.zipWith(Flux.range(1, 3), (error, retryCount) -> {
if (retryCount < 3) {
return retryCount;
}
return error;
}).flatMap(errorOrRetryCount -> {
if (errorOrRetryCount instanceof Throwable) {
return Mono.error((Throwable) errorOrRetryCount);
}
return Mono.delay(Duration.ofSeconds(1));
}));

Flux 각요소에 대해 콜백을 제공하여 변환 및 필터링

.handle()

Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
Flux<String> processedNumbers = numbers.handle((num, sink) -> {
if (num % 2 == 0) {
sink.next("Even: " + num);
}
});

Mono의 값을 비동기 조건으로 필터링

.filterWhen()

Mono<Integer> numberMono = Mono.just(5);
Mono<Integer> filteredMono = numberMono.filterWhen(num -> Mono.just(num > 3));

첫번째 요소에대해 특별로직 수행, 나머지Flux반환

.switchOnFirst()

Flux<Integer> numbers = Flux.just(1, 2, 3);
Flux<Integer> processedFlux = numbers.switchOnFirst((signalType, value) -> {
if (signalType == SignalType.ON_NEXT) {
System.out.println("First element: " + value.get());
}
return value;
});

Flux가 완료되거나 에러가 발생했을 때 특정 동작실행

.doFinally()

Flux<Integer> numbers = Flux.just(1, 2, 3);
numbers = numbers.doFinally(signalType -> {
if (signalType == SignalType.ON_COMPLETE) {
System.out.println("Flux completed.");
}
});

Flux를 반복해서 재생

.repeat()

Flux<Integer> numbers = Flux.just(1, 2, 3);
Flux<Integer> repeatedFlux = numbers.repeat(2); // 2번 반복

Mono의 값을 무시하고 완료 신호만 유지

.ignoreElement()

Mono<Integer> numberMono = Mono.just(42);
Mono<Void> completionMono = numberMono.ignoreElement();

초기값과 함께 Flux의 요소를 집계하고 결과를 차례로 발행

.scan()

Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
Flux<Integer> accumulatedFlux = numbers.scan((acc, num) -> acc + num);
// 결과: 1, 3, 6, 10, 15

Flux의 각 요소를 비동기처리후 결과를 Iterable형태로 변환

.concatMapIterable()

Flux<Integer> flux = Flux.just(1, 2, 3);
Flux<String> processedFlux = flux.concatMapIterable(num -> Arrays.asList("A" + num, "B" + num));
// 결과: A1, B1, A2, B2, A3, B3

기준에 따라 Flux요소를 그룹핑

.groupBy()

Flux<String> animals = Flux.just("cat", "dog", "cow", "elephant", "camel");
Flux<GroupedFlux<Character, String>> groupedFluxes = animals.groupBy(animal -> animal.charAt(0));

재시도 로직을 사용하고 에러를 반환

.retryWhen()

Flux<Integer> fluxWithRetryAndErrorHandling = someFlux.retryWhen(errors -> errors.zipWith(Flux.range(1, 3), (error, retryCount) -> {
if (retryCount < 3) {
return retryCount;
}
return error;
}).flatMap(errorOrRetryCount -> {
if (errorOrRetryCount instanceof Throwable) {
return Mono.error((Throwable) errorOrRetryCount);
}
return Mono.delay(Duration.ofSeconds(1));
})).onErrorMap(IllegalStateException.class, ex -> new CustomException("Custom error occurred.", ex));

Flux와 Mono를 연결하고 모든 작업이 완료될 때까지 기다림

.concatWith()

Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Mono<Integer> mono1 = Mono.just(7);
Mono<Integer> mono2 = Mono.just(8);

Flux<Integer> concatenatedFlux = flux1.concatWith(flux2).concatWith(Mono.when(mono1, mono2));

에러가 발생할 경우 대체 Flux로 처리

.onErrorResume()

Flux<Integer> fluxWithFallback = someFlux.map(num -> num / 0)
.onErrorResume(ex -> Flux.range(1, 5)); // 에러가 발생하면 대체 Flux로 처리

특정 조건에 따라 Flux의 요소를 버퍼링

.bufferUntil() / .bufferWhile()

Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9);
Flux<List<Integer>> bufferedUntilFlux = numbers.bufferUntil(num -> num % 3 == 0); // 3의 배수일 때 버퍼링
Flux<List<Integer>> bufferedWhileFlux = numbers.bufferWhile(num -> num % 2 == 0); // 짝수일 때 버퍼링

Flux의 데이터 생성과 구독이 다른스레드에서 수행

어렵..

Flux<Integer> flux = Flux.range(1, 5)
.map(num -> {
System.out.println("Mapping on thread: " + Thread.currentThread().getName());
return num * 2;
})
.publishOn(Schedulers.parallel()) // 데이터 생성과 관련된 작업을 병렬 스레드로 수행
.map(num -> {
System.out.println("Mapping again on thread: " + Thread.currentThread().getName());
return num + 1;
})
.subscribeOn(Schedulers.single()); // 구독과 관련된 작업을 단일 스레드로 수행

flux.subscribe(System.out::println);

Backpressure를 처리

.onBackpressureBuffer()

Flux<Integer> sourceFlux = Flux.range(1, 1000);
Flux<Integer> bufferedFlux = sourceFlux.onBackpressureBuffer(100); // 최대 100개의 요소만 버퍼링하도록 설정

동적으로 요청하고 응답을 처리

Flux<String> requests = Flux.just("request-1", "request-2", "request-3");
Flux<String> processedResponses = requests.switchMap(request -> {
// request를 기반으로 비동기 작업 수행 (예: 외부 API 호출)
return WebClient.create().get()
.uri("https://api.example.com/data?query=" + request)
.retrieve()
.bodyToMono(String.class);
});

Flux의 최신 값을 조합

.combineLatest()

Flux<Long> interval1 = Flux.interval(Duration.ofSeconds(1));
Flux<Long> interval2 = Flux.interval(Duration.ofMillis(500));

Flux<Tuple2<Long, Long>> combinedFlux = Flux.combineLatest(interval1, interval2, Tuple2::of);
// 두 Flux 중 하나라도 값이 갱신되면 최신 값을 조합하여 Tuple을 발행합니다.

Flux<Tuple2<Long, Long>> withLatestFromFlux = interval1.withLatestFrom(interval2, Tuple2::of);
// 첫 번째 Flux의 값이 갱신될 때마다 두 번째 Flux의 최신 값을 조합하여 Tuple을 발행합니다.

반복을 컨트롤

어렵..

AtomicInteger counter = new AtomicInteger();

Flux<String> dataFlux = Flux.defer(() -> {
int count = counter.getAndIncrement();
return count < 5 ? Flux.just("Data-" + count) : Flux.empty();
});

dataFlux.repeatWhen(companion -> companion.take(3).delayElements(Duration.ofSeconds(2)))
.subscribe(System.out::println);

구독자가 동일한 시퀀스를 공유

어렵다..

Flux<Integer> sourceFlux = Flux.range(1, 5)
.doOnSubscribe(subscription -> System.out.println("New subscriber connected"));

ConnectableFlux<Integer> connectableFlux = sourceFlux.publish();

connectableFlux.subscribe(num -> System.out.println("Subscriber 1: " + num));
connectableFlux.subscribe(num -> System.out.println("Subscriber 2: " + num));

connectableFlux.connect();

Flux의 중복 로직을 재사용

.compose()

Function<Flux<String>, Flux<String>> toUpperCase = stringFlux -> stringFlux.map(String::toUpperCase);
Function<Flux<String>, Flux<String>> trimWhitespace = stringFlux -> stringFlux.map(String::trim);

Flux<String> dataFlux = Flux.just(" hello ", " world ");
dataFlux.compose(trimWhitespace).compose(toUpperCase).subscribe(System.out::println);

Flux의 각 요소 사의 시간 간격을 측정, 새로운 Flux생성

.elapsed()

Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
Flux<Tuple2<Integer, Long>> elapsedFlux = numbers
.elapsed()
.doOnNext(tuple -> System.out.println("Elapsed: " + tuple.getT2() + "ms"));

두개의 Flux중 하나가 요소를 발행하면 다른 Flux취소

.takeUntilOther()

Flux<Integer> sourceFlux = Flux.just(1, 2, 3, 4, 5);
Flux<Long> durationFlux = Flux.interval(Duration.ofSeconds(3));

Flux<Integer> takeUntilOtherFlux = sourceFlux.takeUntilOther(durationFlux);
takeUntilOtherFlux.subscribe(System.out::println);

지정범위 내에서 순차적인 숫자 발행

.range()

Flux<Integer> rangeFlux = Flux.range(1, 5);
rangeFlux.subscribe(System.out::println);

에러발생시 대체값

.onErrorReturn()

Flux<Integer> numbers = Flux.just(1, 2, 3);
Flux<Integer> fluxWithFallback = numbers.map(num -> num / 0)
.onErrorReturn(0); // 에러가 발생하면 대체 값 0으로 처리

지정된 시간마다 새로운 Flux생성

.windowTimeout()

Flux<Integer> numbers = Flux.range(1, 10);
Flux<Flux<Integer>> windowedFlux = numbers.windowTimeout(3, Duration.ofSeconds(2));
windowedFlux.subscribe(window -> window.collectList().subscribe(System.out::println));

두개의 Flux를 병합

.zip()

Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<String> flux2 = Flux.just("A", "B", "C");

Flux<Tuple2<Integer, String>> zippedFlux = Flux.zip(flux1, flux2);
zippedFlux.subscribe(tuple -> System.out.println(tuple.getT1() + tuple.getT2()));

Flux중 가장 먼저 도착한 Flux를 선택

.firstEmitting()

Flux<Integer> flux1 = Flux.just(1, 2, 3).delayElements(Duration.ofSeconds(2));
Flux<Integer> flux2 = Flux.just(10, 20, 30).delayElements(Duration.ofSeconds(1));

Flux<Integer> firstFlux = Flux.firstEmitting(flux1, flux2);
firstFlux.subscribe(System.out::println);

Flux에서 단 하나의 요소를 가져옴

.single()

Flux<Integer> numbers = Flux.just(42);
numbers.single()
.doOnSuccess(num -> System.out.println("Single value: " + num))
.doOnError(ex -> System.out.println("Error: " + ex.getMessage()))
.subscribe();

Flux의 요소를 하나의 결과로 줄임

.reduce()

Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
Mono<Integer> sum = numbers.reduce((x, y) -> x + y);
sum.subscribe(result -> System.out.println("Sum: " + result));

여러 Flux를 합침

.merge()

Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Flux<Integer> mergedFlux = Flux.merge(flux1, flux2);
mergedFlux.subscribe(System.out::println);

Flux<Integer> concatenatedFlux = Flux.concat(flux1, flux2);
concatenatedFlux.subscribe(System.out::println);

Flux에서 조건에 만족하는 요소를 확인

.any()

Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
numbers.any(num -> num > 3)
.subscribe(result -> System.out.println("Any number greater than 3: " + result));

Flux에서 모든 요소가 조건에 만족하는지 확인

.all()

Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
numbers.all(num -> num > 0)
.subscribe(result -> System.out.println("All numbers are greater than 0: " + result));

Flux의 모든 요소를 리스트로 수집

.collectList()

Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
numbers.collectList()
.subscribe(list -> System.out.println("List of numbers: " + list));

Flux의 요소를 키-값 쌍으로 수집하여 Map으로 반환

.collectMap()

Flux<String> animals = Flux.just("cat", "dog", "elephant", "giraffe");
animals.collectMap(animal -> animal.charAt(0))
.subscribe(map -> System.out.println("Animals map: " + map));

초기값과 함께 Flux의 요소를 집계하고 결과를 발행

Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
numbers.scanWith(() -> 0, (acc, num) -> acc + num)
.subscribe(result -> System.out.println("Accumulated sum: " + result));

주어진 시간 간격으로 최근에 발행된 Flux의 요소를 샘플링

.sample()

Flux.interval(Duration.ofMillis(200))
.sample(Duration.ofSeconds(1))
.take(5)
.subscribe(System.out::println);

Flux가 완료되었을 때 지정된 조건에 따라 반복

.repeatWhenEmpty()

Flux<Integer> flux = Flux.just(1, 2, 3).concatWith(Flux.empty());
flux.repeatWhenEmpty(companion -> companion.take(2)).subscribe(System.out::println);

에러가 발생하더라도 모든 Flux를 처리

.concatMapDelayError

Flux<Integer> numbers = Flux.just(1, 2, 3);
Flux<Integer> processedFlux = numbers.concatMapDelayError(num -> {
if (num == 2) {
return Mono.error(new RuntimeException("Error processing number 2"));
}
return Mono.just(num * 2);
}, 1); // 1로 설정하여 에러 발생 시 해당 에러만 처리하고 나머지 Flux는 에러 발생 이후에 처리

지정된 시간만큼 구독을 지연

.delaySubscription()

Flux<Integer> numbers = Flux.range(1, 5);
numbers.delaySubscription(Duration.ofSeconds(3)).subscribe(System.out::println);

Flux각 요소를 비동기처리후 결과 Iterable형태로 반환

.concatMapIterable()

Flux<Integer> flux = Flux.just(1, 2, 3);
Flux<String> processedFlux = flux.concatMapIterable(num -> Arrays.asList("A" + num, "B" + num));

지정된 조건에 따라 Flux를 반복

.repeatWhen()

Flux<Integer> flux = Flux.range(1, 3);
Flux<Integer> repeatedFlux = flux.repeatWhen(companion -> companion.take(2));

각 요청마다 새로운 Flux를 생성하여 이전요청 취소

.switchMap()

Flux<String> requests = Flux.just("request-1", "request-2", "request-3");
Flux<String> processedResponses = requests.switchMap(request -> {
// request를 기반으로 비동기 작업 수행 (예: 외부 API 호출)
return WebClient.create().get()
.uri("https://api.example.com/data?query=" + request)
.retrieve()
.bodyToMono(String.class);
});

배압 처리

.onBackpressureBuffer()

Flux<Integer> sourceFlux = Flux.range(1, 1000);
Flux<Integer> bufferedFlux = sourceFlux.onBackpressureBuffer(100); // 최대 100개의 요소만 버퍼링하도록 설정

첫번째 요소에 대해 로직 수행후, 나머지 Flux반환

.switchOnFirst()

Flux<Integer> numbers = Flux.just(1, 2, 3);
Flux<Integer> processedFlux = numbers.switchOnFirst((signalType, value) -> {
if (signalType == SignalType.ON_NEXT) {
System.out.println("First element: " + value.get());
}
return value;
});

Flux각 요소에 콜백을 제공하여 변환 또는 필터링

.handle()

Flux<Integer> numbers = Flux.just(1, 2, 3, 4, 5);
Flux<String> processedNumbers = numbers.handle((num, sink) -> {
if (num % 2 == 0) {
sink.next("Even: " + num);
}
});

Flux가 완료되거나 에러발생시 특정 동작 실행

.doFinally()

Flux<Integer> numbers = Flux.just(1, 2, 3);
numbers = numbers.doFinally(signalType -> {
if (signalType == SignalType.ON_COMPLETE) {
System.out.println("Flux completed.");
}
});

Flux와 Mono를 연결하고 모든 작업이 완료될때까지 기다림

.concatWith()

Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Mono<Integer> mono1 = Mono.just(7);
Mono<Integer> mono2 = Mono.just(8);

Flux<Integer> concatenatedFlux = flux1.concatWith(flux2).concatWith(Mono.when(mono1, mono2));

특정 기준에 따라 Flux의 요소를 그룹핑

.groupBy()

Flux<String> animals = Flux.just("cat", "dog", "cow", "elephant", "camel");
Flux<GroupedFlux<Character, String>> groupedFluxes = animals.groupBy(animal -> animal.charAt(0));