Reactive Programing 용어 정리 : https://wyjj.tistory.com/93
Reactor란?
리액티브 스트림즈 표준 사양을 구현한 구현체 중 하나이다. Spring 5 버전부터 지원하는 리액티브 스택에 포함되어 리액티브한 애플리케이션으로 동작하는데 핵심적인 역할을 하는 라이브러리이다.
기본적으로 요청 쓰레드가 차단되지 않는 Non-Blocking 통신을 지원하여 MSA(Microservice Architecture) 구조에 적합한 라이브러리이다.
Reactor는 Publisher 타입으로 Mono[0|1]와 Flux[N]이라는 두 가지 타입을 제공한다. Mono[0|1]은 0건 또는 1건의 데이터를 emit 할 수 있고, Flux[N]은 N개의 데이터를 emit 할 수 있음을 의미한다.
Reactor 구성 요소
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class HelloReactiveExample {
public static void main(String[] args) throws InterruptedException {
Flux // Reactor Sequence의 시작점, N개의 데이터를 emit
.just("Hello", "Reactor") // 데이터를 emit하는 Publisher 역할
.map(message -> message.toUpperCase()) // 데이터를 가공하는 Operator 역할
.publishOn(Schedulers.parallel()) // 쓰레드 관리자 역할의 Scheduler를 지정하는 Operator 역할. 이 메서드를 기준으로 Downstream의 쓰레드를 변경할 수 있다.
.subscribe(System.out::println, // Publisher가 emit한 데이터를 처리
error -> System.out.println(error.getMessage()), // 에러를 전달 받아 처리하는 역할
() -> System.out.println("# onComplete")); // Sequence가 종료된 후 후처리하는 역할
// Scheduler로 지정한 쓰레드는 데몬 쓰레드이기 때문에, main 쓰레드가 종료되면 동시에 종료된다.
// 따라서 main 쓰레드를 0.1초 딜레이시켜 데몬 쓰레드가 동작할 시간을 만들어준다.
Thread.sleep(100L);
}
}
마블 다이어그램(Marble Diagram)
마블 다이어그램이란, 동그라미는 하나의 데이터를 의미하며 다이어그램 상에서 시간의 흐름에 따라 변화하는 데이터의 흐름을 표현을 표현한 다이어그램이다.
Flux의 마블 다이어그램
위의 타임 라인은 Sequence가 시작되는 것을 표현한 것이며 동그라미는 데이터가 emit되는 것을 의미하고, 수직 바는 Squence가 정상 종료됨을 알 수 있다.
아래의 타임 라인은 Operator에서 가공 처리된 데이터가 Downstream으로 전달될 때의 타임라인이며 X 표시는 에러가 발생한 것을 의미한다.
스케줄러(Scheduler)
스케줄러는 Reactor Sequence 상에서 처리되는 동작들을 하나 이상의 쓰레드에서 동작하도록 별도의 쓰레드를 제공해 주는 것이다.
Non-Blocking 통신 비동기프로그래밍을 위한 중요한 역할을 한다. 쓰레드를 관리하는 관리자 역할을 한다.
Scheduler를 추가하지 않을 경우
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class SchedulersExample01 {
public static void main(String[] args) {
Flux
.range(1, 10) // publisher. 1~10까지의 숫자를 emit
.filter(n -> n % 2 == 0) // Operator. 짝수만 필터링
.map(n -> n * 2) // Operator. 2를 곱함
.subscribe(data -> log.info("# onNext: {}", data)); // emit한 데이터를 처리
}
}
// 출력
12:55:24.957 [main] INFO schedulers.SchedulersExample01 - # onNext: 4
12:55:24.958 [main] INFO schedulers.SchedulersExample01 - # onNext: 8
12:55:24.958 [main] INFO schedulers.SchedulersExample01 - # onNext: 12
12:55:24.958 [main] INFO schedulers.SchedulersExample01 - # onNext: 16
12:55:24.958 [main] INFO schedulers.SchedulersExample01 - # onNext: 20
main 쓰레드에서만 실행되는 것을 확인할 수 있다.
subscribeOn() Operator
subscribeon()은 데이터 소스에서 데이터를 emit하는 원본 Publisher의 실행 쓰레드를 지정하는 역할을 한다.
주로 Schedulers.boundedElastic() 쓰레드를 사용한다.
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
@Slf4j
public class SchedulersExample02 {
public static void main(String[] args) throws InterruptedException {
Flux
.range(1, 10)
.subscribeOn(Schedulers.boundedElastic()) // 구독 직 후 실행되는 쓰레드가 main 쓰레드에서 지정한 쓰레드로 변경된다.
.doOnSubscribe(subscription -> log.info("# doOnSubscribe")) // 구독 직 후에 트리거 되는 Operator
.filter(n -> n % 2 == 0)
.map(n -> n * 2)
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(100L);
}
}
// 출력
13:51:51.897 [main] INFO schedulers.SchedulersExample02 - # doOnSubscribe
13:51:51.902 [boundedElastic-1] INFO schedulers.SchedulersExample02 - # onNext: 4
13:51:51.903 [boundedElastic-1] INFO schedulers.SchedulersExample02 - # onNext: 8
13:51:51.904 [boundedElastic-1] INFO schedulers.SchedulersExample02 - # onNext: 12
13:51:51.904 [boundedElastic-1] INFO schedulers.SchedulersExample02 - # onNext: 16
13:51:51.904 [boundedElastic-1] INFO schedulers.SchedulersExample02 - # onNext: 20
subscribeOn() Operator가 구독 시점 직 후 실행되는 작업인 range() Operator 처럼 데이터를 생성하고, 생성한 데이터를 emit하는 작업의 실행 쓰레드를 지정한다. 때문에 doOnSubscribe() Operator에서 출력되는 로그가 main 쓰레드에서 실행되고, range() Operator에서 데이터를 Emit하는 단계부터는 boundedElastic-1 쓰레드에서 실행된다.
publishOn() Operator
publishOn()은 전달 받은 데이터를 가공 처리하는 Operator 앞에 추가해서 실행 쓰레드를 별도로 추가하는 역할을 한다.
주로 Schedulers.parallel() 쓰레드를 사용한다.
@Slf4j
public class SchedulersExample03 {
public static void main(String[] args) throws InterruptedException {
Flux
.range(1, 10)
.subscribeOn(Schedulers.boundedElastic())
.doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
.publishOn(Schedulers.parallel()) // Downstream 쪽 쓰레드가 지정한 쓰레드로 변경
.filter(n -> n % 2 == 0)
.doOnNext(data -> log.info("# filter doOnNext")) // 바로 앞의 Operator가 실행될 때 트리거되는 Operator
.publishOn(Schedulers.parallel())
.map(n -> n * 2)
.doOnNext(data -> log.info("# map doOnNext"))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(100L);
}
}
// 출력
14:18:10.652 [main] INFO schedulers.SchedulersExample03 - # doOnSubscribe
14:18:10.660 [parallel-2] INFO schedulers.SchedulersExample03 - # filter doOnNext
14:18:10.660 [parallel-2] INFO schedulers.SchedulersExample03 - # filter doOnNext
14:18:10.660 [parallel-2] INFO schedulers.SchedulersExample03 - # filter doOnNext
14:18:10.660 [parallel-2] INFO schedulers.SchedulersExample03 - # filter doOnNext
14:18:10.660 [parallel-2] INFO schedulers.SchedulersExample03 - # filter doOnNext
14:18:10.660 [parallel-1] INFO schedulers.SchedulersExample03 - # map doOnNext
14:18:10.660 [parallel-1] INFO schedulers.SchedulersExample03 - # onNext: 4
14:18:10.662 [parallel-1] INFO schedulers.SchedulersExample03 - # map doOnNext
14:18:10.662 [parallel-1] INFO schedulers.SchedulersExample03 - # onNext: 8
14:18:10.662 [parallel-1] INFO schedulers.SchedulersExample03 - # map doOnNext
14:18:10.662 [parallel-1] INFO schedulers.SchedulersExample03 - # onNext: 12
14:18:10.662 [parallel-1] INFO schedulers.SchedulersExample03 - # map doOnNext
14:18:10.662 [parallel-1] INFO schedulers.SchedulersExample03 - # onNext: 16
14:18:10.662 [parallel-1] INFO schedulers.SchedulersExample03 - # map doOnNext
14:18:10.662 [parallel-1] INFO schedulers.SchedulersExample03 - # onNext: 20
doOnNext()는 doOnNext() 바로 앞에 위치한 Operator가 실행될 때, 트리거 되는 Operator이다.
publishOn() Operator가 fiter()와 map() Operator 앞에 추가되면서 publishOn()이 추가될 때 마다 실행 쓰레드가 바뀌는 것을 확인할 수 있다.
Operators
Reactor는 상당히 많은 상황별 Operator를 제공한다.
상황별로 분류된 Operator 목록 리뷰
- 새로운 Sequence를 생성(Creating)하고자 할 경우
- just()
- ⭐ fromStream()
- ⭐ fromIterable()
- fromArray()
- range()
- interval()
- empty()
- never()
- defer()
- using()
- generate()
- ⭐ create()
- 기존 Sequence에서 변환 작업(Transforming)이 필요한 경우
- ⭐ map()
- ⭐ flatMap()
- ⭐ concat()
- collectList()
- collectMap()
- merge()
- ⭐ zip()
- then()
- switchIfEmpty()
- and()
- when()
- Sequence 내부의 동작을 확인(Peeking)하고자 할 경우
- doOnSubscribe
- ⭐doOnNext()
- doOnError()
- doOnCancel()
- doFirst()
- doOnRequest()
- doOnTerminate()
- doAfterTerminate()
- doOnEach()
- doFinally()
- ⭐log()
- Sequence에서 데이터 필터링(Filtering)이 필요한 경우
- ⭐filter()
- ignoreElements()
- distinct()
- ⭐take()
- next()
- skip()
- sample()
- single()
- 에러를 처리(Handling errors)하고자 할 경우
- ⭐error()
- ⭐timeout()
- onErrorReturn()
- onErrorResume()
- onErrorMap()
- doFinally()
- ⭐retry()
1. 새로운 Sequence를 생성(Creating)하고자 할 경우
forStream()
import reactor.core.publisher.Flux;
import java.util.stream.Stream;
public class FromStreamExample01 {
public static void main(String[] args) {
Flux
.fromStream(Stream.of(200, 300, 400, 500, 600))
.reduce((a, b) -> a + b)
.subscribe(System.out::println);
}
}
// 출력
2000
fromStream()은 전달받은 Stream이 포함하고 있는 데이터를 차례대로 emit 한다.
reduce() Operator는 Upstream에서 emit된 두 개의 데이터를 순차적으로 누적 처리할 수 있는 Operator이다.
fromIterable()
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.util.List;
import java.util.stream.Stream;
@Slf4j
public class FromIterableExample01 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.coffeeList)
.subscribe(coffee -> log.info("{} : {}", coffee.getKorName(), coffee.getPrice()));
}
}
// 출력
09:48:25.328 [main] INFO FromIterableExample01 - 아메리카노 : 2500
09:48:25.329 [main] INFO FromIterableExample01 - 카페라떼 : 3500
09:48:25.329 [main] INFO FromIterableExample01 - 바닐라 라떼 : 4500
09:48:25.329 [main] INFO FromIterableExample01 - 카라멜 마끼아또 : 5500
09:48:25.329 [main] INFO FromIterableExample01 - 에스프레소 : 5000
fromIterable()은 어떤 데이터(아이템)가 있는지와 그 값을 얻어오는 것만 관여할 뿐 특정 데이터 타입에 의존하지 않는다.
create()
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.util.Arrays;
import java.util.List;
@Slf4j
public class CreateExample {
private static List<Integer> source= Arrays.asList(1, 3, 5, 7, 9, 11, 13, 15, 17, 19);
public static void main(String[] args) {
Flux.create((FluxSink<Integer> sink) -> {
// onRequest()는 파라미터인 람다 표현식을 실행한다.
sink.onRequest(n -> {
// source를 순회하며 next() 메서드로 원소를 emit 한다.
for (int i = 0; i < source.size(); i++) {
sink.next(source.get(i));
}
sink.complete(); // Squence를 종료하기 위한 complete() 메서드
});
// onDispose()는 Sequence가 완전히 종료되기 직전에 호출되어 후처리 작업을 한다.
sink.onDispose(() -> log.info("# clean up"));
}).subscribe(data -> log.info("# onNext: {}", data));
}
}
// 출력
10:26:22.231 [main] INFO CreateExample - # onNext: 1
10:26:22.232 [main] INFO CreateExample - # onNext: 3
10:26:22.232 [main] INFO CreateExample - # onNext: 5
10:26:22.232 [main] INFO CreateExample - # onNext: 7
10:26:22.232 [main] INFO CreateExample - # onNext: 9
10:26:22.232 [main] INFO CreateExample - # onNext: 11
10:26:22.232 [main] INFO CreateExample - # onNext: 13
10:26:22.232 [main] INFO CreateExample - # onNext: 15
10:26:22.233 [main] INFO CreateExample - # onNext: 17
10:26:22.233 [main] INFO CreateExample - # onNext: 19
10:26:22.233 [main] INFO CreateExample - # clean up
create() Operator는 프로그래밍 방식으로 직접 Signal 이벤트를 발생시키는 Operator이다. Publisher가 데이터를 emit 할 경우 발생되는 onNext Signal 이벤트와는 달리 한 번에 여러 건의 데이터를 비동기적으로 emit 하는데 사용한다.
creage()의 파라미터는 FluxSink라는 람다 파라미터를 가지는 람다 표현식이다. Flux나 Mono의 just(), fromIterable() 같은 데이터 생성 Operator를 사용하면 내부에서 emit하는 등의 Sequence를 진행하는 것과 달리 직접 Signal 이벤트를 발생 시켜 Sequence를 진행하도록 한다.
2. 기존 Sequence에서 변환 작업(Transforming)이 필요한 경우
flatMap()
flatMap()은 내부로 들어오는 데이터 한 건당 하나의 Sequence가 생성된다.
flatMap() 내부에 정의하는 Sequence를 Inner Sequence라고 부른다.
예제는 구구단으로 단이 바뀔 때 마다 쓰레드가 비동기적으로 동작하는 것을 볼 수 있다. 하지만 작업의 처리 순서는 보장하지 않는다.
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
@Slf4j
public class FlatMapExample01 {
public static void main(String[] args) throws InterruptedException {
Flux
.range(2, 9) // 2부터 9까지의 숫자를 emit
.flatMap(dan -> Flux
.range(1, 9) // 1부터 9까지의 숫자를 emit
.publishOn(Schedulers.parallel()) // Inner Sequence를 처리할 쓰레드를 할당
.map(num -> dan + " x " + num + " = " + dan * num))
.subscribe(log::info);
Thread.sleep(100L);
}
}
// 출력
20:36:58.158 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
20:36:58.205 [parallel-1] INFO FlatMapExample01 - 2 x 1 = 2
20:36:58.206 [parallel-1] INFO FlatMapExample01 - 2 x 2 = 4
20:36:58.206 [parallel-1] INFO FlatMapExample01 - 2 x 3 = 6
20:36:58.206 [parallel-1] INFO FlatMapExample01 - 2 x 4 = 8
20:36:58.206 [parallel-1] INFO FlatMapExample01 - 2 x 5 = 10
20:36:58.206 [parallel-1] INFO FlatMapExample01 - 2 x 6 = 12
20:36:58.206 [parallel-1] INFO FlatMapExample01 - 2 x 7 = 14
20:36:58.206 [parallel-1] INFO FlatMapExample01 - 2 x 8 = 16
20:36:58.206 [parallel-1] INFO FlatMapExample01 - 2 x 9 = 18
20:36:58.209 [parallel-7] INFO FlatMapExample01 - 7 x 1 = 7
20:36:58.209 [parallel-7] INFO FlatMapExample01 - 7 x 2 = 14
20:36:58.209 [parallel-7] INFO FlatMapExample01 - 7 x 3 = 21
20:36:58.209 [parallel-7] INFO FlatMapExample01 - 7 x 4 = 28
20:36:58.209 [parallel-7] INFO FlatMapExample01 - 7 x 5 = 35
20:36:58.209 [parallel-7] INFO FlatMapExample01 - 7 x 6 = 42
20:36:58.209 [parallel-7] INFO FlatMapExample01 - 7 x 7 = 49
20:36:58.209 [parallel-7] INFO FlatMapExample01 - 7 x 8 = 56
20:36:58.209 [parallel-7] INFO FlatMapExample01 - 7 x 9 = 63
...
20:36:58.209 [parallel-7] INFO FlatMapExample01 - 8 x 1 = 8
20:36:58.209 [parallel-7] INFO FlatMapExample01 - 3 x 1 = 3
20:36:58.209 [parallel-7] INFO FlatMapExample01 - 3 x 2 = 6
20:36:58.209 [parallel-7] INFO FlatMapExample01 - 3 x 3 = 9
20:36:58.209 [parallel-7] INFO FlatMapExample01 - 3 x 4 = 12
20:36:58.209 [parallel-7] INFO FlatMapExample01 - 3 x 5 = 15
20:36:58.209 [parallel-7] INFO FlatMapExample01 - 3 x 6 = 18
20:36:58.209 [parallel-7] INFO FlatMapExample01 - 3 x 7 = 21
20:36:58.209 [parallel-7] INFO FlatMapExample01 - 3 x 8 = 24
20:36:58.209 [parallel-7] INFO FlatMapExample01 - 3 x 9 = 27
concat()
concat()은 입력으로 전달하는 Publisher의 Sequence를 연결하여 차례대로 데이터를 emit 한다.
import reactor.core.publisher.Flux;
public class ConcatExample01 {
public static void main(String[] args) {
Flux
.concat(Flux.just("Monday", "Tuesday", "Wednesday", "Thursday", "Friday"),
Flux.just("Saturday", "Sunday"))
.subscribe(System.out::println);
}
}
// 출력
Monday
Tuesday
Wednesday
Thursday
Friday
Saturday
Sunday
zip()
zip()은 입력으로 전달되는 여러 개의 Publisher Sequence에서 emit된 데이터를 결합하는 Operator이다.
각각의 다른 Sequence에서 emit되는 데이터 중에서 같은 index의 데이터들이 결합된다.
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.time.Duration;
@Slf4j
public class ZipExample01 {
public static void main(String[] args) throws InterruptedException {
Flux<Long> source1 = Flux.interval(Duration.ofMillis(200L)).take(4);
Flux<Long> source2 = Flux.interval(Duration.ofMillis(400L)).take(6);
Flux
.zip(source1, source2, (data1, data2) -> data1 + data2)
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(3000L);
}
}
// 출력
16:39:56.011 [parallel-2] INFO ZipExample01 - # onNext: 0
16:39:56.411 [parallel-2] INFO ZipExample01 - # onNext: 2
16:39:56.812 [parallel-2] INFO ZipExample01 - # onNext: 4
16:39:57.211 [parallel-2] INFO ZipExample01 - # onNext: 6
interval() Operator는 파라미터로 전달한 시간(Duration.ofMillis(…))을 주기로 해서 0부터 1씩 증가한 숫자를 emit하는 Operator이다. take() 메서드로 지정한 숫자만큼만 증가하도록 하였다.
source1, 2의 Sequence는 emit하는 시점과 데이터의 갯수가 다르지만, zip()은 같은 index만 결합하기 때문에 총 4개의 데이터만 결합된다.
3. Sequence 내부의 동작을 확인(Peeking)하고자 할 경우
doOnNext()
doOnNext()는 데이터 emit 시 트리거 되어 부수 효과(side-effect)를 추가할 수 있는 Operator이다. 함수형 프로그래밍 세계에서 부수 효과는 어떤 동작을 실행하되 리턴 값이 없는 것을 의미한다.
doOnNext()는 주로 로깅에 사용되지만 데이터를 emit 하면서 필요한 추가 작업도 처리할 수 있다.
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class DoOnNextExample01 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.coffeeList)
.doOnNext(coffee -> validateCoffee(coffee)) // emit 되는 데이터의 유효성을 검증
.subscribe(data -> log.info("{} : {}", data.getKorName(), data.getPrice()));
}
private static void validateCoffee(Coffee coffee) {
if (coffee == null) {
throw new RuntimeException("Not found coffee");
}
// TODO 유효성 검증에 필요한 로직을 필요한 만큼 추가할 수 있습니다.
}
}
log()
log()는 Publisher에서 발생하는 Signal 이벤트를 로그로 출력해주는 역할을 한다.
import reactor.core.publisher.Flux;
import java.util.stream.Stream;
public class LogExample01 {
public static void main(String[] args) {
Flux
.fromStream(Stream.of(200, 300, 400, 500, 600))
.log()
.reduce((a, b) -> a + b)
.log()
.subscribe(System.out::println);
}
}
// 출력
21:11:37.850 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
21:11:37.875 [main] INFO reactor.Flux.Stream.1 - | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
21:11:37.875 [main] INFO reactor.Mono.Reduce.2 - | onSubscribe([Fuseable] MonoReduce.ReduceSubscriber)
21:11:37.876 [main] INFO reactor.Mono.Reduce.2 - | request(unbounded)
21:11:37.876 [main] INFO reactor.Flux.Stream.1 - | request(unbounded)
21:11:37.876 [main] INFO reactor.Flux.Stream.1 - | onNext(200)
21:11:37.876 [main] INFO reactor.Flux.Stream.1 - | onNext(300)
21:11:37.876 [main] INFO reactor.Flux.Stream.1 - | onNext(400)
21:11:37.876 [main] INFO reactor.Flux.Stream.1 - | onNext(500)
21:11:37.876 [main] INFO reactor.Flux.Stream.1 - | onNext(600)
21:11:37.876 [main] INFO reactor.Flux.Stream.1 - | onComplete()
21:11:37.876 [main] INFO reactor.Mono.Reduce.2 - | onNext(2000)
2000
21:11:37.876 [main] INFO reactor.Mono.Reduce.2 - | onComplete()
구독 시점에 onSubscribe Signal 이벤트가 발생했다.
데이터 요청 시, request Signal 이벤트가 발생했다.
Publisher가 데이터를 emit 할 때 onNext Signal 이벤트가 발생했다. (총 5번)
emit이 종료되고 onComplete Signal 이벤트가 발생했다.
reduce() Operator의 데이터가 emit 되면서 onNext Signal 이벤트가 발생하고, 완료 후 onComplete 이벤트가 발생했다.
4. 에러를 처리(Handling errors)하고자 할 경우
error()
error()는 의도적으로 onError Signal 이벤트를 발생시킬 때 사용할 수 있는 Operator이다.
Spring MVC의 Exception Throw와 동일한 목적이다.
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
@Slf4j
public class ErrorExample01 {
public static void main(String[] args) {
Mono.justOrEmpty(findVerifiedCoffee())
.switchIfEmpty(Mono.error(new RuntimeException("Not found coffee")))
.subscribe(
data -> log.info("{} : {}", data.getKorName(), data.getPrice()),
error -> log.error("# onError: {}", error.getMessage()));
}
private static Coffee findVerifiedCoffee() {
return null;
}
}
// 출력
18:11:29.762 [main] ERROR ErrorExample01 - # onError: Not found coffee
justOrEmpty() Operator는 파라미터로 전달되는 데이터소스가 null 이어도 에러가 발생하지 않는다.
switchIfEmpty() Operator는 Upstream에서 전달되는 데이터가 null이면 대체 동작을 수행할 수 있다.
findVerifiedCoffee() 메서드에서 일부로 null을 리턴하여 error 가 출력되는 것을 볼 수 있다.
timeout(), retry()
timeout()은 입력으로 주어진 시간동안 emit 되는 데이터가 없으면 onError Signal 이벤트를 발생시킨다.
retry()는 Sequence 상에서 에러가 발생할 경우, 입력으로 주어진 숫자만큼 재구독해서 Sequence를 다시 시작한다.
두 Operator는 함께 사용하는 경우가 많다.
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.stream.Collectors;
@Slf4j
public class TimeoutRetryExample01 {
public static void main(String[] args) throws InterruptedException {
getCoffees()
.collect(Collectors.toSet()) // timeout 되기 전의 emit된 중복 데이터를 제거하기 위함
.subscribe(bookSet -> bookSet
.stream()
.forEach(data ->
log.info("{} : {}", data.getKorName(), data.getPrice())));
Thread.sleep(12000);
}
private static Flux<Coffee> getCoffees() {
final int[] count = {0};
return Flux
.fromIterable(SampleData.coffeeList)
.delayElements(Duration.ofMillis(500)) // 0.5초에 한번씩 emit
.map(coffee -> {
try {
count[0]++;
if (count[0] == 3) { // 3번째 커피에서 2초 delay 추가
Thread.sleep(2000);
}
} catch (InterruptedException e) {
}
return coffee;
})
.timeout(Duration.ofSeconds(2)) // 2초안에 데이터가 emit 되지 않으면 onError 발생
.retry(1) // 1회 재구독하여 Sequence 다시 실행
.doOnNext(coffee -> log.info("# getCoffees > doOnNext: {}, {}",
coffee.getKorName(), coffee.getPrice()));
}
}
// 출력
21:42:13.852 [parallel-2] INFO TimeoutRetryExample01 - # getCoffees > doOnNext: 아메리카노, 2500
21:42:14.363 [parallel-4] INFO TimeoutRetryExample01 - # getCoffees > doOnNext: 카페라떼, 3500
21:42:16.872 [parallel-6] DEBUG reactor.core.publisher.Operators - onNextDropped: com.codestates.coffee.Coffee@155bbfd4
21:42:16.873 [parallel-8] INFO TimeoutRetryExample01 - # getCoffees > doOnNext: 아메리카노, 2500
21:42:17.379 [parallel-2] INFO TimeoutRetryExample01 - # getCoffees > doOnNext: 카페라떼, 3500
21:42:17.884 [parallel-4] INFO TimeoutRetryExample01 - # getCoffees > doOnNext: 바닐라 라떼, 4500
21:42:18.388 [parallel-6] INFO TimeoutRetryExample01 - # getCoffees > doOnNext: 카라멜 마끼아또, 5500
21:42:18.891 [parallel-8] INFO TimeoutRetryExample01 - # getCoffees > doOnNext: 에스프레소, 5000
21:42:18.892 [parallel-8] INFO TimeoutRetryExample01 - 카페라떼 : 3500
21:42:18.893 [parallel-8] INFO TimeoutRetryExample01 - 카라멜 마끼아또 : 5500
21:42:18.893 [parallel-8] INFO TimeoutRetryExample01 - 아메리카노 : 2500
21:42:18.893 [parallel-8] INFO TimeoutRetryExample01 - 에스프레소 : 5000
21:42:18.893 [parallel-8] INFO TimeoutRetryExample01 - 바닐라 라떼 : 4500
delayElements() 메서드로 emit을 0.5초씩 지연시키고, map()을 통해 3번째(count [3]) 커피가 들어올 경우 2초 delay를 추가하여 총 2.5초가 지연되도록 하였다. timeout()으로 2초안에 데이터가 emit 되지 않으면 onError가 발생하도록 하였지만, retry()를 1로 지정하여 한번 더 Sequence를 시작하도록 하였다.
실행 결과, 세 번째 데이터가 time out 되어 Drop 되었지만 재구독 처리가 되어 emit이 된 것을 볼 수 있다.
'Java > reator' 카테고리의 다른 글
[Spring WebFlux] 리액티브 프로그래밍이란? (0) | 2022.10.12 |
---|