Backpressure
배압 설명
배압(Backpressure)은 리액티브 프로그래밍에서 중요한 개념으로, 소스(Flux 또는 Mono)가 능동적으로 데이터를 발행하는 속도가 구독자(subscriber)가 데이터를 처리하는 속도보다 빠르거나 높을 때 발생합니다. 이 경우 구독자는 처리할 수 있는 양 이상의 데이터를 처리해야 하므로 메모리 부담이 커지고 애플리케이션 성능이 저하될 수 있습니다.
스프링 WebFlux에서 배압은 다음과 같은 상황에서 발생할 수 있습니다:
- 빠른 속도로 데이터를 발생시키는 소스(Flux 또는 Mono)와 느린 처리 속도로 데이터를 처리하는 구독자 간의 불균형이 있을 때.
- 느린 리액터 파이프라인이 있는 경우(예: I/O 바운드 작업, 외부 API 호출). 배압을 처리하기 위한 방법으로 스프링 WebFlux는 다음과 같은 전략들을 제공합니다:
-
버퍼링: 구독자가 데이터를 처리할 때까지 데이터를 버퍼링하여 대기시키는 방법입니다. 버퍼링은 메모리 사용량을 높일 수 있으므로 유의해야 합니다.
-
드롭(Dropping): 구독자가 처리할 수 없는 데이터는 단순히 버리는 방법입니다. 메모리 사용량을 감소시키지만 데이터 손실이 발생할 수 있습니다.
-
에러 발생: 구독자가 처리할 수 없는 데이터가 발생하면 에러를 발생시키는 방법입 니다. 이러한 경우, 소스가 에러를 발생시키면서 구독이 종료될 수 있습니다.
-
최신 데이터 유지: 구독자가 처리할 준비가 될 때까지 최신 데이터만 유지하고, 나머지 데이터는 버리는 방법입니다.
배압을 사용하는 시점은 데이터 생산 속도와 소비 속도가 불균형하게 발생하는 경우입니다. 특히 대량의 데이터를 처리하거나 소스가 빠른 데이터를 생성하는 상황에서는 배압을 고려해야 합니다. 일반적으로 I/O 바운드 작업이나 외부 API 호출과 같은 작업들은 데이터 생산과 소비 속도가 다르기 때문에 배압 처리가 필요할 수 있습니다.
배압을 처리하는 방법은 상황에 따라 다르며, 스프링 WebFlux에서 제공하는 다양한 배압 전략을 활용하여 적절한 처리 방식을 선택할 수 있습니다. 애플리케이션의 특성과 요구사항에 맞게 적절한 배압 전략을 선택하여 리액티브 애플리케이션의 안정성과 성능을 보장하는 것이 중요합니다.
RestApi 대용량 업로드시 배압
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@RestController
public class FileUploadController {
@PostMapping(value = "/upload", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public Mono<Void> uploadFile(@RequestPart("file") Flux<FilePart> filePartFlux, ServerHttpResponse response) {
response.setStatusCode(HttpStatus.OK);
// File to save the uploaded content
File file = new File("uploaded_file.txt");
// Write the content of the Flux<DataBuffer> to the file
FileOutputStream outputStream;
try {
outputStream = new FileOutputStream(file);
} catch (IOException e) {
return Mono.error(e);
}
return filePartFlux
.flatMap(part -> part.content().doOnNext(dataBuffer -> {
try {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
outputStream.write(bytes);
} catch (IOException e) {
throw new RuntimeException("Error writing file.", e);
} finally {
dataBuffer.release();
}
}).doOnError(e -> {
try {
outputStream.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}))
.then(Mono.fromRunnable(() -> {
try {
outputStream.close();
} catch (IOException e) {
throw new RuntimeException("Error closing file stream.", e);
}
}))
.onBackpressureBuffer() // Backpressure 처리를 위해 onBackpressureBuffer() 메서드 사용
.then();
}
}
이 예제에서는 파일 업로드를 처리하기 위해 @RequestPart 애노테이션을 사용하고, Flux FilePart를 받습니다. Flux FilePart는 파일을 비동기적으로 읽는 Flux DataBuffer를 포함합니다. onBackpressureBuffer() 메서드를 사용하여 Backpressure를 처리합니다. 이로 인해 서버와 클라이언트 간의 데이터 흐름을 조절하고, 메모리 오버플로우와 같은 문제를 예방할 수 있습니다.
물론 실제 파일 업로드 시에는 보안 및 파일 유효성 검사와 같은 추가적인 로직이 필요할 수 있습니다. 예제 코드는 파일을 덮어쓰기할 수도 있으므로 실제 상용 환경에서는 파일 이름을 랜덤하게 생성하거나, 중복을 방지하는 방법을 고려해야 합니다. 또한, 오류 처리와 파일 처리 완료 후의 클린업 로직 등도 추가해야 합니다.
백프레셔를 사용하는 예제
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
public class BulkUploadController {
private final WebClient minioClient;
public BulkUploadController(WebClient.Builder webClientBuilder) {
this.minioClient = webClientBuilder.baseUrl("http://minio-server-url")
.build();
}
@PostMapping(value = "/bulk-upload", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public Mono<Void> bulkUpload(@RequestPart("file") Flux<byte[]> filePartFlux,
ServerHttpResponse response) {
response.setStatusCode(HttpStatus.OK);
// Upload each part to MinIO
return filePartFlux
.onBackpressureBuffer() // Backpressure 처리를 위해 onBackpressureBuffer() 메서드 사용
.flatMap(part -> {
// Replace "bucketName" and "objectName" with appropriate values
return minioClient.put()
.uri("/bucketName/objectName")
.body(BodyInserters.fromDataBuffers(Flux.just(part)))
.exchange()
.flatMap(clientResponse -> clientResponse.bodyToMono(Void.class));
})
.then();
}
}
위의 예제에서 WebClient를 사용하여 MinIO 서버로 비동기적으로 파일을 업로드합니다. onBackpressureBuffer() 메서드를 사용하여 스프링 WebFlux의 배압을 고려하여 파일을 업로드하므로, 대량 혹은 대용량 파일의 안정적인 업로드가 가능합니다.
minioClient.put() 메서드를 사용하여 MinIO 서버에 PUT 요청을 보내고, body() 메서드를 사용하여 파일 내용을 전달합니다. MinIO의 엔드포인트인 http://minio-server-url과 버킷 이름, 오브젝트 이름을 적절한 값으로 대체하여 사용하십시오.
실제 운영 환경에서는 파일 저장 경로, 보안, 예외 처리, MinIO 접속 정보 등을 추가하여 완전한 기능을 구현해야 합니다. 이 예제는 MinIO에 바로 업로드하는 방법을 보여주는 것이며, 다른 요구사항에 따라 더 많은 기능을 추가할 수 있습니다.
.onBackpressureBuffer() 사용효과
.onBackpressureBuffer() 메서드는 리액티브 스트림에서 발생하는 배압(Backpressure)을 처리하는 데 사용되는 메서드입니다. 배압은 데이터 생산과 데이터 처리 사이의 불균형으로 인해 발생하는 상황을 다루기 위한 개념입니다. 데이터 생산자가 데이터를 더 빠르게 생산하는 경우, 데이터 소비자가 처리를 따라잡지 못하여 메모리 누수나 애플리케이션 장애를 유발할 수 있습니다. 이런 상황에서 .onBackpressureBuffer()를 사용하여 Backpressure를 처리하면 어떤 효과가 있는지 알아보겠습니다.
.onBackpressureBuffer() 메서드의 효과는 다음과 같습니다:
- 배압 처리: .onBackpressureBuffer() 메서드를 사용하면 데이터 생산과 데이터 처리 사이의 불균형 상황에서 데이터를 버퍼링하여 처리합니다. 데이터 생산 속도가 데이터 처리 속도보다 빠르면, 버퍼에 데이터를 저장하여 메모리 누수를 방지하고, 처리 속도에 따라 데이터를 조절하여 데이터 소비자가 처리할 수 있을 때까지 대기시킵니다.
- 메모리 관리: .onBackpressureBuffer()는 배압이 발생할 때 버퍼를 사용하여 대기하므로, 메모리 관리를 효율적으로 처리합니다. 데이터를 처리할 수 없는 상황에서 데이터를 처리 가능한 속도로 제어하여 메모리 오버플로우를 방지합니다.
- 백프레셔의 역압 요청: 배압이 발생하면 데이터 생산자에게 데이터를 보내는 것을 제한하는 역압 요청을 보냅니다. 데이터 생산자는 역압 요청에 따라 데이터를 적절하게 제어하여 데이터 소비자가 처리할 수 있는 속도로 데이터를 전송합니다.
.onBackpressureBuffer()를 사용하여 배압을 처리하면, 데이터 처리 속도와 데이터 생산 속도 사이의 불균형 상황을 안정적으로 관리할 수 있습니다. 이를 통해 리액티브 스트림의 안정성과 성능을 향상시킬 수 있습니다. 하지만 버퍼링을 사용하므로 메모리 사용량에도 영향을 미칠 수 있으므로, 적절한 버퍼 크기 설정과 메모리 관리가 필요합니다. 실제 상황에서는 사용 환경과 요구사항에 따라 적절한 배압 전략을 선택하여 리액티브 애플리케이션을 구현하는 것이 중요합니다.
.onBackpressureBuffer() 를 사용할 때 별도 지정할것?
.onBackpressureBuffer()를 사용할 때는 추가적으로 지정해주어야 할 몇 가지 요소들이 있습니다. 주요한 것들을 아래에 설명드리겠습니다:
버퍼 크기 지정: .onBackpressureBuffer()는 데이터를 버퍼링하여 대기시키기 때문에, 버퍼의 크기를 명시적으로 지정해야 합니다. 이를 통해 메모리 사용량을 제어하고 적절한 버퍼 크기를 유지할 수 있습니다. 예를 들어, .onBackpressureBuffer(100)과 같이 버퍼 크기를 100으로 설정하면, 최대 100개의 데이터를 버퍼에 저장하여 대기시킵니다.
오류 처리: .onBackpressureBuffer()를 사용하면서 데이터를 버퍼링하는 동안 오류가 발생할 수 있습니다. 예를 들어, 버퍼가 가득 차서 더 이상 데이터를 버퍼에 저장할 수 없는 상황에서 오류가 발생할 수 있습니다. 이러한 상황에 대한 오류 처리를 추가해주어야 합니다. .onBackpressureBuffer() 메서드는 더 이상 버퍼에 저장할 수 없을 때 IllegalStateException을 발생시키며, 이에 대한 처리를 해주어야 합니다.
요청 전략 지정: .onBackpressureBuffer()는 요청 전략을 선택적으로 지정할 수 있습니다. 기본적으로는 아이템 하나씩 요청하며, 이때 버퍼 크기를 초과하는 아이템은 버립니다. 이 외에도 .onBackpressureBuffer의 다른 오버로드된 버전을 사용하여 원하는 요청 전략을 선택할 수 있습니다.
예시 코드에서는 다음과 같이 사용됩니다:
filePartFlux.onBackpressureBuffer(100) // 버퍼 크기를 100으로 지정
.flatMap(part -> {
// 데이터를 버퍼에 저장하고 처리 로직 수행
})
.doOnError(throwable -> {
// 배압이 발생했을 때 오류 처리
})
.then();
.onBackpressureBuffer() 사용할 때 버퍼의 용량은 무엇으로 결정??
.onBackpressureBuffer() 메서드를 사용할 때 버퍼의 용량은 capacity 매개변수를 통해 결정됩니다. capacity는 버퍼의 크기를 나타내며, 이 값에 따라 버퍼가 최대로 저장할 수 있는 아이템의 개수가 결정됩니다. 버퍼의 용량을 지정함으로써 데이터 생산과 데이터 처리 간의 불균형 상황에서 메모리 사용을 제어하고, 안정적으로 배압을 처리할 수 있도록 합니다.
.onBackpressureBuffer() 메서드의 시그니처는 다음과 같습니다:
public final Flux<T> onBackpressureBuffer()
public final Flux<T> onBackpressureBuffer(int capacity)
public final Flux<T> onBackpressureBuffer(int capacity, boolean errorOnOverflow)
여기서 capacity는 버퍼의 크기를 나타내며, errorOnOverflow는 버퍼에 더 이상 아이템을 저장할 수 없을 때 오류를 발생시킬지 여부를 결정하는 불리언 값입니다.
capacity: 버퍼의 용량을 설정합니다. 버퍼에 저장할 수 있는 아이템의 최대 개수입니다. errorOnOverflow: 버퍼에 더 이상 아이템을 저장할 수 없을 때 오류를 발생시킬지 여부를 결정합니다. true로 설정하면 버퍼 용량을 초과하는 경우 IllegalStateException이 발생합니다. false로 설정하면 버퍼 용량을 초과하는 아이템은 버립니다. 예를 들어, .onBackpressureBuffer(100, true)와 같이 사용하면 버퍼의 용량은 100이며, 버퍼가 가득 찬 상태에서 더 이상 아이템을 저장할 수 없을 때 IllegalStateException이 발생합니다.
주의: 버퍼 용량을 지정할 때는 메모리 사용을 고려하여 적절한 값을 선택해야 합니다. 너무 작은 용량을 지정하면 너무 빈번한 오버플로우가 발생할 수 있고, 너무 큰 용량을 지정하면 메모리 소비가 증가할 수 있습니다. 실제 상황에서는 용량을 조정하면서 효율적으로 배압을 처리할 수 있도록 설정해야 합니다.
버퍼의 용량 100"이라는 표현은 아이템의 개수를 의미하며, 사용되는 데이터의 유형과 크기에 따라 다를 수 있습니다. 버퍼의 용량은 한 번에 처리할 수 있는 데이터 아이템의 개수를 지정하는 것으로, 이는 애플리케이션의 요구사항과 메모리 사용에 따라 결정됩니다.
예를 들어, "버퍼의 용량 100"이라고 한다면, 해당 버퍼에는 최대 100개의 데이터 아이템을 저장할 수 있다는 뜻입니다. 데이터 아이템의 크기에 따라서 실제로 사용되는 메모리 양이 달라질 수 있습니다. 예를 들어, 각 데이터 아이템이 1KB 크기를 가진다면, 100개의 아이템을 저장하는 데에는 약 100KB의 메모리가 필요합니다.
이와 같은 버퍼의 크기는 애플리케이션의 환경과 요구사항에 따라 결정되어야 합니다. 대량 혹은 대용량 데이터를 처리해야 할 경우에는 충분한 버퍼 크기를 확보하여 데이터를 안정적으로 처리하는 것이 중요합니다. 그러나 불필요하게 큰 버퍼를 사용하는 경우에는 메모리 사용량이 증가하여 성능에 영향을 미칠 수 있으므로 적절한 값을 선택하는 것이 중요합니다.
따라서 버퍼의 용량을 결정할 때에는 애플리케이션의 데이터 특성, 요구사항, 메모리 제한 등을 고려하여 적절한 크기를 설정해야 합니다. 실험과 모니터링을 통해 적절한 버퍼 크기를 찾는 것이 권장되는 접근 방법입니다.
버퍼의 사용량과 시스템의 허용량 확인
.onBackpressureBuffer()를 사용할 때 버퍼의 사용량과 시스템의 허용량을 확인하고 조절하는 방법은 다음과 같습니다:
-
버퍼의 사용량 확인: .onBackpressureBuffer()를 사용하여 버퍼를 생성하면, 해당 버퍼는 내부적으로 Ring Buffer(환형 버퍼) 형태로 구현됩니다. 이 때 Ring Buffer의 현재 사용량과 용량(크기)를 확인하고 조절하는 방법은 라이브러리마다 다소 차이가 있을 수 있습니다. 일반적으로 .onBackpressureBuffer()를 사용한 Flux에서는 버퍼의 사용량을 추적하는 방법을 제공하지 않습니다. 하지만 Reactor에서는 Flux에 대한 sink를 사용하여 직접 버퍼의 사용량을 추적할 수 있습니다. 예를 들어, Flux.create()를 사용하여 Flux를 생성하고 FluxSink를 통해 데이터를 버퍼에 직접 저장하면서 사용량을 확인할 수 있습니다.
-
시스템의 허용량 확인과 조절: 시스템의 허용량은 시스템의 리소스, 메모리, 처리 능력 등에 따라 결정됩니다. 이를 확인하고 조절하는 방법은 다음과 같습니다:
-
메모리 사용량 모니터링: 애플리케이션에서 사용하는 메모리의 양을 모니터링하고, 버퍼가 사용하는 메모리와 전체 메모리 사용량을 비교하여 메모리 누수를 확인할 수 있습니다. Java의 경우 메모리 프로파일링 도구를 사용하여 메모리 사용량을 모니터링할 수 있습니다.
-
시스템 리소스 모니터링: 시스템의 CPU 사용량, 디스크 I/O, 네트워크 사용량 등을 모니터링하여 시스템의 허용량을 확인할 수 있습니다.
-
버퍼의 용량 조절: .onBackpressureBuffer()의 capacity 매개변수를 조절하여 버퍼의 용량을 변경할 수 있습니다. 적절한 용량을 선택하여 메모리 사용량과 시스템의 허용량을 고려하여 배압을 처리할 수 있도록 합니다.
위의 방법들을 사용하여 버퍼의 사용량과 시스템의 허용량을 모니터링하고 조절함으로써 대량 혹은 대용량 데이터를 처리하는 과정에서 안정성과 성능을 향상시킬 수 있습니다. 실제 운영 환경에서는 실험과 모니터링을 통해 적절한 설정을 찾는 것이 중요하며, 성능 테스트를 통해 시스템의 허용량을 확인하는 것도 좋은 방법입니다.