본문 바로가기

java/스프링인액션

10. 리액터

 

자바 스트림 리액티브 스트림
동기화, 한정된 데이터 작업 수행 무한 데이터셋
비동기 처리 지원
실시간 데이터 처리,
백 프레셔: 데이터 전달 폭주 방지
   

10.3.1~10.3.2 워니

 

반응형 프로그래밍

 

 

이걸 깨닫고 나면 새로운 세상이 열린다.
https://www.youtube.com/watch?v=KDiE5qQ3bZI

 

 

 

함수형 프로그래밍

코딩 중 오류의 소지가 될 수 있는 

변수의 사용을 지양!

순수함수를 사용해서 프로그래밍 하는 것!

변수의 사용을 지양! 순수함수를 사용!

어떤 데이터를 
특정 기준으로 걸러낸 다음
최초 몇개 만 추려내고

특정 형식으로 변환한 다음
한 문자열로 합치는 것

함수형이 아닌 걸로
값이 바뀔 수 있는 변수를 사용했으나 
이제 사용하지 않는다

소프트웨어가 복잡해지면 오류가 생기겠죠?

코드에 노출된 변수들은 위험합니다.
 
 

 

3.1 리액티브 타입 생성하기

 

리액티브 타입 생성하기

리퍼지터리나 서비스로부터

Flux나 Mono가 제공됨. 

 

리액티브 타입을 생성할 필요가 없다.

데이터를 방출하는 새로운 리액티브 publisher를 생성해야한다.

 

리액터는 flux나 Mono를 생성하는 오퍼레이션을 제공한다.

 

오퍼레이션

 

1-1) 객체로부터 Flux 생성하기 : just()

Flux나 Mono 로 생성하려는 하나 이상의 객체가 있다면

Flux나 Mono의 just() 메서드를 사용하여

리액티브 타입을 생성할 수 있다.

 

다섯개의 String 객체 --> Flux를 생성한다

//다섯개의 String 객체 --> Flux를 생성
@Test
public void createAFlux_just(){
	Flux<String> fruitFlux = Flux.just("사과", "오렌지", "포도", "바나나", "딸기");
 }

이 경우 Flux는 생성되지만, 구독자가 없다.

구독자 없이는 데이터가 전달되지 않는다.

리액티브 타입을 구독하려면 데이터가 흘러갈 수 있게 하는 것이다.

 

 

2) Flux -> Subscriber 데이터 전달

//구독자 추가 -> subscribe 메서드 호출
fruitFlux.subscribe(f-> System.out.println("여기 과일: "+f));

 

java.util.Consumer이며

리액티브 스트림의 Subscriber 객체를 생성하기 위해 사용된다.

 

Subscribe()호출 -> 데이터 전달 시작!

Flux ------------------------데-이-터--전-달--시-작--------------------> Subscriber

 

 

3) StepVerifier 으로 테스트하기. (데이터 검사)

 

StepVerifier: 해당 리액티브 타입을 구독 -->

                 스트림을 통해 전달되는 데이터 assertion 적용함 -->

                 스트림이 기대대로 작동하는지 검사함

 

// StepVerifier: 구독 데이터 검사
StepVerifier.create(fruitFlux)
		.expectNext("사과")
        .expectNext("오렌지")
        .expectNext("포도")
        .expectNext("바나나")
        .expectNext("딸기")
        .verifyComplete();

 

 

StepVerifier가 fruitFlux를 구독한 후 --> 데이터 항목이 기대한 과일 이름과 일치하는지 --> 어서션을 적용함 -> fruitFlux가 완전한지 검사함

 

1-2) 컬렉션으로부터 Flux 생성하기 : fromArray()

Flux는 배열, Iterable 객체, 자바 Stream 객체로부터 생성될 수 있다.

배열 -> Flux 생성 : fromArray()

 

//다섯개의 String 객체 --> Flux를 생성
@Test
public void createAFlux_just(){
	Flux<String> fruitFlux = Flux.just("사과", "오렌지", "포도", "바나나", "딸기");// 1
 }

//배열 --> Flux를 생성
@Test
public void createAFlux_fromArray(){
	String[] fruits = new String[]{ "사과", "오렌지", "포도", "바나나", "딸기" };
	Flux<String> fruitFlux = Flux.fromArray(fruits);// 2
    
    // StepVerifier: 구독 데이터 검사
	StepVerifier.create(fruitFlux)
    	.expectNext("사과")
        .expectNext("오렌지")
        .expectNext("포도")
        .expectNext("바나나")
        .expectNext("딸기")
        .verifyComplete();
    
 }
 
 // List -> 
@Test
public void createAFlux_fromIterable(){
	List<String> fruitList = new ArrayList<>();
    			fruitList.add("사과").add("오렌지").add("포도").add("바나나").add("딸기");
	Flux<String> fruitFlux = Flux.fromIterable(fruitList);// 3
    
    // StepVerifier: 구독 데이터 검사
	StepVerifier.create(fruitFlux)
    	.expectNext("사과")
        .expectNext("오렌지")
        .expectNext("포도")
        .expectNext("바나나")
        .expectNext("딸기")
        .verifyComplete();
    
 }
 
 //Stream 객체 ->
 @Test
public void createAFlux_fromIterable(){
	Stream<String> fruitStream = fruitStream.of("사과", "오렌지", "포도", "바나나", "딸기");
	Flux<String> fruitFlux = Flux.fromStream(fruitStream);// 4
    
    // StepVerifier: 구독 데이터 검사
	StepVerifier.create(fruitFlux)
    	.expectNext("사과")
        .expectNext("오렌지")
        .expectNext("포도")
        .expectNext("바나나")
        .expectNext("딸기")
        .verifyComplete();
    
 }

 

4) Flux 데이터 생성: range()

데이터 없이 매번 새 값으로 증가하는 숫자를 방출하는

카운터 역할의 Flux만 필요한 경우

Static 메서드인 range()를 사용해라.

 

//Flux.range(1,5);
@Test
public void createAFlux_range(){
	Flux<Inteager> intervalFF = Flux.range(1,5); //1~5 포함하는 카운터 Flux 생성
    StepVerifier.create(intervalFF)//검사
    	.expectNext(1)
        .expectNext(2)
        .expectNext(3)
        .expectNext(4)
        .expectNext(5)
        .verifyComplete();
        
        
//Flux.interval(시간).take();
@Test
public void createAFlux_range(){
	Flux<Long> intervalFF = Flux.interval(Duration.ofSeconds(5));// Flux 생성
    							.take(5); // 항목은 5개로 한정
    StepVerifier.create(intervalFF)//검사
    	.expectNext(0L)
    	.expectNext(1L)
        .expectNext(2L)
        .expectNext(3L)
        .expectNext(4L)
        .verifyComplete();

 

 

 

3.2 리액티브 타입 조합하기

두개의 리액티브 타입을 결합

하나의 Flux를 ---> 두개 이상 리액티브 타입으로 분할

 

1) 결합 :

(1) mergeWith(): 주는대로 a소스, b소스 합쳐버림

두 개의 Flux 스트림 --> 하나 Flux

@Test
public void mergeFluxes() {

  Flux<String> characterFlux = Flux
      .just("Garfield", "Kojak", "Barbossa") // String -> Flux 객체 생성
      .delayElements(Duration.ofMillis(500)); // 데이터 방출 속도 조절
  Flux<String> foodFlux = Flux
      .just("Lasagna", "Lollipops", "Apples") // String -> Flux 객체 생성
      .delaySubscription(Duration.ofMillis(250)) // 스트리밍 시작 시간 미루기
      .delayElements(Duration.ofMillis(500));// 데이터 방출 속도 조절

  Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux); // mergeWith 함수!!

  StepVerifier.create(mergedFlux)
      .expectNext("Garfield")
      .expectNext("Lasagna")
      .expectNext("Kojak")
      .expectNext("Lollipops")
      .expectNext("Barbossa")
      .expectNext("Apples")
      .verifyComplete();
}

(2) zip() : a소스, b소스를 합치든 뭔가 가공해서 C 소스로 flux객체 생성.

정적 오퍼레이션

zippedFlux 로부터 방출되는 항목: Tuple2

각 소스 Flux가 순서대로 방출하는 항목을 포함한다.

zip 오퍼레이션은 Flux소스를 한 개씩 번갈아 가져와 새로운 Flux를 생성한다.

 

@Test
public void zipFluxes() {
  Flux<String> characterFlux = Flux.just("Garfield", "Kojak", "Barbossa");// String -> Flux 객체 생성
  Flux<String> foodFlux = Flux.just("Lasagna", "Lollipops", "Apples");// String -> Flux 객체 생성

  Flux<Tuple2<String, String>> zippedFlux = Flux.zip(characterFlux, foodFlux); //zip() 사용

  StepVerifier.create(zippedFlux) // 검사
      .expectNextMatches(p ->
          p.getT1().equals("Garfield") &&
          p.getT2().equals("Lasagna"))
      .expectNextMatches(p ->
          p.getT1().equals("Kojak") &&
          p.getT2().equals("Lollipops"))
      .expectNextMatches(p ->
          p.getT1().equals("Barbossa") &&
          p.getT2().equals("Apples"))
          .verifyComplete();
}

 

Tupel2가 아닌 다른 타입을 사용하고 시픙면

원하는 객체를 zip()에 제공하면 된다.

 

두개의 입력 flux요소부터 메시지를 포함하는 Flux 생성

 

 

@Test
public void zipFluxesToObject() {
  Flux<String> characterFlux = Flux.just("Garfield", "Kojak", "Barbossa"); //Flux 객체 생성
  Flux<String> foodFlux = Flux.just("Lasagna", "Lollipops", "Apples"); //Flux 객체 생성

  Flux<String> zippedFlux =
      Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f); // 합치기

  StepVerifier.create(zippedFlux)
        .expectNext("Garfield eats Lasagna")
        .expectNext("Kojak eats Lollipops")
        .expectNext("Barbossa eats Apples")
        .verifyComplete();
}

(3) first() : 먼저 도착한 소스만 받고 나머지는 무시

 

 

@Test
public void firstFlux() {
  Flux<String> slowFlux = Flux.just("tortoise", "snail", "sloth")// Flux 생성
        .delaySubscription(Duration.ofMillis(100)); //구독을 느리게 시작함
  Flux<String> fastFlux = Flux.just("hare", "cheetah", "squirrel");// Flux 생성

  Flux<String> firstFlux = Flux.first(slowFlux, fastFlux);//먼저 도착한 소스만 받는다. 
  //따라서 slowFlux는 없음

  StepVerifier.create(firstFlux)
      .expectNext("hare")
      .expectNext("cheetah")
      .expectNext("squirrel")
      .verifyComplete();
}

 

'java > 스프링인액션' 카테고리의 다른 글

11 리액티브 API 개발  (0) 2021.12.18
Spring in Action  (0) 2021.12.12
8. 비동기 메시지 전송  (0) 2021.12.05
[8.3] 카프카 사용하기  (0) 2021.12.04
[6.3] 스프링 데이터 REST  (0) 2021.11.27