자바 스트림 | 리액티브 스트림 |
동기화, 한정된 데이터 작업 수행 | 무한 데이터셋 비동기 처리 지원 실시간 데이터 처리, 백 프레셔: 데이터 전달 폭주 방지 |
10.3.1~10.3.2 워니
반응형 프로그래밍
이걸 깨닫고 나면 새로운 세상이 열린다.
https://www.youtube.com/watch?v=KDiE5qQ3bZI
코딩 중 오류의 소지가 될 수 있는
변수의 사용을 지양!
순수함수를 사용해서 프로그래밍 하는 것!
어떤 데이터를 | |
특정 기준으로 걸러낸 다음 | |
최초 몇개 만 추려내고 |
|
특정 형식으로 변환한 다음 | |
한 문자열로 합치는 것 함수형이 아닌 걸로 |
|
값이 바뀔 수 있는 변수를 사용했으나 이제 사용하지 않는다 소프트웨어가 복잡해지면 오류가 생기겠죠? 코드에 노출된 변수들은 위험합니다. |
|
3.1 리액티브 타입 생성하기
리액티브 타입 생성하기
리퍼지터리나 서비스로부터
Flux나 Mono가 제공됨.
리액티브 타입을 생성할 필요가 없다.
데이터를 방출하는 새로운 리액티브 publisher를 생성해야한다.
리액터는 flux나 Mono를 생성하는 오퍼레이션을 제공한다.
오퍼레이션
1-1) 객체로부터 Flux 생성하기 : just()
Flux나 Mono 로 생성하려는 하나 이상의 객체가 있다면
Flux나 Mono의 just() 메서드를 사용하여
리액티브 타입을 생성할 수 있다.
다섯개의 String 객체 --> Flux를 생성한다
//다섯개의 String 객체 --> Flux를 생성
@Test
public void createAFlux_just(){
Flux<String> fruitFlux = Flux.just("사과", "오렌지", "포도", "바나나", "딸기");
}
이 경우 Flux는 생성되지만, 구독자가 없다.
구독자 없이는 데이터가 전달되지 않는다.
리액티브 타입을 구독하려면 데이터가 흘러갈 수 있게 하는 것이다.
2) Flux -> Subscriber 데이터 전달
//구독자 추가 -> subscribe 메서드 호출
fruitFlux.subscribe(f-> System.out.println("여기 과일: "+f));
java.util.Consumer이며
리액티브 스트림의 Subscriber 객체를 생성하기 위해 사용된다.
Subscribe()호출 -> 데이터 전달 시작!
Flux ------------------------데-이-터--전-달--시-작--------------------> Subscriber
3) StepVerifier 으로 테스트하기. (데이터 검사)
StepVerifier: 해당 리액티브 타입을 구독 -->
스트림을 통해 전달되는 데이터 assertion 적용함 -->
스트림이 기대대로 작동하는지 검사함
// StepVerifier: 구독 데이터 검사
StepVerifier.create(fruitFlux)
.expectNext("사과")
.expectNext("오렌지")
.expectNext("포도")
.expectNext("바나나")
.expectNext("딸기")
.verifyComplete();
StepVerifier가 fruitFlux를 구독한 후 --> 데이터 항목이 기대한 과일 이름과 일치하는지 --> 어서션을 적용함 -> fruitFlux가 완전한지 검사함
1-2) 컬렉션으로부터 Flux 생성하기 : fromArray()
Flux는 배열, Iterable 객체, 자바 Stream 객체로부터 생성될 수 있다.
배열 -> Flux 생성 : fromArray()
//다섯개의 String 객체 --> Flux를 생성
@Test
public void createAFlux_just(){
Flux<String> fruitFlux = Flux.just("사과", "오렌지", "포도", "바나나", "딸기");// 1
}
//배열 --> Flux를 생성
@Test
public void createAFlux_fromArray(){
String[] fruits = new String[]{ "사과", "오렌지", "포도", "바나나", "딸기" };
Flux<String> fruitFlux = Flux.fromArray(fruits);// 2
// StepVerifier: 구독 데이터 검사
StepVerifier.create(fruitFlux)
.expectNext("사과")
.expectNext("오렌지")
.expectNext("포도")
.expectNext("바나나")
.expectNext("딸기")
.verifyComplete();
}
// List ->
@Test
public void createAFlux_fromIterable(){
List<String> fruitList = new ArrayList<>();
fruitList.add("사과").add("오렌지").add("포도").add("바나나").add("딸기");
Flux<String> fruitFlux = Flux.fromIterable(fruitList);// 3
// StepVerifier: 구독 데이터 검사
StepVerifier.create(fruitFlux)
.expectNext("사과")
.expectNext("오렌지")
.expectNext("포도")
.expectNext("바나나")
.expectNext("딸기")
.verifyComplete();
}
//Stream 객체 ->
@Test
public void createAFlux_fromIterable(){
Stream<String> fruitStream = fruitStream.of("사과", "오렌지", "포도", "바나나", "딸기");
Flux<String> fruitFlux = Flux.fromStream(fruitStream);// 4
// StepVerifier: 구독 데이터 검사
StepVerifier.create(fruitFlux)
.expectNext("사과")
.expectNext("오렌지")
.expectNext("포도")
.expectNext("바나나")
.expectNext("딸기")
.verifyComplete();
}
4) Flux 데이터 생성: range()
데이터 없이 매번 새 값으로 증가하는 숫자를 방출하는
카운터 역할의 Flux만 필요한 경우
Static 메서드인 range()를 사용해라.
//Flux.range(1,5);
@Test
public void createAFlux_range(){
Flux<Inteager> intervalFF = Flux.range(1,5); //1~5 포함하는 카운터 Flux 생성
StepVerifier.create(intervalFF)//검사
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(4)
.expectNext(5)
.verifyComplete();
//Flux.interval(시간).take();
@Test
public void createAFlux_range(){
Flux<Long> intervalFF = Flux.interval(Duration.ofSeconds(5));// Flux 생성
.take(5); // 항목은 5개로 한정
StepVerifier.create(intervalFF)//검사
.expectNext(0L)
.expectNext(1L)
.expectNext(2L)
.expectNext(3L)
.expectNext(4L)
.verifyComplete();
3.2 리액티브 타입 조합하기
두개의 리액티브 타입을 결합
하나의 Flux를 ---> 두개 이상 리액티브 타입으로 분할
1) 결합 :
(1) mergeWith(): 주는대로 a소스, b소스 합쳐버림
두 개의 Flux 스트림 --> 하나 Flux
@Test
public void mergeFluxes() {
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa") // String -> Flux 객체 생성
.delayElements(Duration.ofMillis(500)); // 데이터 방출 속도 조절
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples") // String -> Flux 객체 생성
.delaySubscription(Duration.ofMillis(250)) // 스트리밍 시작 시간 미루기
.delayElements(Duration.ofMillis(500));// 데이터 방출 속도 조절
Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux); // mergeWith 함수!!
StepVerifier.create(mergedFlux)
.expectNext("Garfield")
.expectNext("Lasagna")
.expectNext("Kojak")
.expectNext("Lollipops")
.expectNext("Barbossa")
.expectNext("Apples")
.verifyComplete();
}
(2) zip() : a소스, b소스를 합치든 뭔가 가공해서 C 소스로 flux객체 생성.
정적 오퍼레이션
zippedFlux 로부터 방출되는 항목: Tuple2
각 소스 Flux가 순서대로 방출하는 항목을 포함한다.
@Test
public void zipFluxes() {
Flux<String> characterFlux = Flux.just("Garfield", "Kojak", "Barbossa");// String -> Flux 객체 생성
Flux<String> foodFlux = Flux.just("Lasagna", "Lollipops", "Apples");// String -> Flux 객체 생성
Flux<Tuple2<String, String>> zippedFlux = Flux.zip(characterFlux, foodFlux); //zip() 사용
StepVerifier.create(zippedFlux) // 검사
.expectNextMatches(p ->
p.getT1().equals("Garfield") &&
p.getT2().equals("Lasagna"))
.expectNextMatches(p ->
p.getT1().equals("Kojak") &&
p.getT2().equals("Lollipops"))
.expectNextMatches(p ->
p.getT1().equals("Barbossa") &&
p.getT2().equals("Apples"))
.verifyComplete();
}
Tupel2가 아닌 다른 타입을 사용하고 시픙면
원하는 객체를 zip()에 제공하면 된다.
@Test
public void zipFluxesToObject() {
Flux<String> characterFlux = Flux.just("Garfield", "Kojak", "Barbossa"); //Flux 객체 생성
Flux<String> foodFlux = Flux.just("Lasagna", "Lollipops", "Apples"); //Flux 객체 생성
Flux<String> zippedFlux =
Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f); // 합치기
StepVerifier.create(zippedFlux)
.expectNext("Garfield eats Lasagna")
.expectNext("Kojak eats Lollipops")
.expectNext("Barbossa eats Apples")
.verifyComplete();
}
(3) first() : 먼저 도착한 소스만 받고 나머지는 무시
@Test
public void firstFlux() {
Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth")// Flux 생성
.delaySubscription(Duration.ofMillis(100)); //구독을 느리게 시작함
Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");// Flux 생성
Flux<String> firstFlux = Flux.first(slowFlux, fastFlux);//먼저 도착한 소스만 받는다.
//따라서 slowFlux는 없음
StepVerifier.create(firstFlux)
.expectNext("hare")
.expectNext("cheetah")
.expectNext("squirrel")
.verifyComplete();
}
'java > 스프링인액션' 카테고리의 다른 글
11 리액티브 API 개발 (0) | 2021.12.18 |
---|---|
Spring in Action (0) | 2021.12.12 |
8. 비동기 메시지 전송 (0) | 2021.12.05 |
[8.3] 카프카 사용하기 (0) | 2021.12.04 |
[6.3] 스프링 데이터 REST (0) | 2021.11.27 |