이 포스트 시리즈는 Reactive Programming은 토비의 스프링 저자 이일민님의 리액티브 프로그래밍 유튜브 강좌를 공부하며 정리한 내용입니다.
Stream 인터페이스에 iterate 라는 메소드가 있다. 이 메소드는 어떠한 데이터 스트림을 쉽게 만들어 낼 수 있는 메소드이다.
public List<Integer> createSampleIntegerList(int count) {
return Stream.iterate(1, e -> e + 1).limit(count).collect(Collectors.toList());
}
위 코드는 1 ~ 10까지 정수를 담은 리스트를 만든다.
핵심은 iterate(시작값, 값의 변화 함수)
메소드인데, 위 예시에서 시작값은 1, 변화는 1씩 증가시킨다라는 뜻이다.
그리고 limit(10)
을 통해 10개만이라는 제한을 걸어주었다. (걸지 않으면 무한하게 증가하기 때문에 collect를 할 수 없다.)
종종 활용할 곳이 있을것 같아 기록해둔다.
<-
방향으로의 흐름을 업스트림(Upstream)이라 하고 ->
방향으로의 흐름을 다운스트림(Downstream)이라 한다.그리고 데이터는 업스트림에서 다운스트림 방향으로 (->
) 흘러간다.
Publisher -> Data -> Subscriber
<- subscribe(Subscriber)
-> onSubscribe(Subscription)
-> onNext
-> onNext
-> ...
-> onComplete
public class PubSub {
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, e -> e + 1).limit(10).collect(Collectors.toList()));
pub.subscribe(logSub());
}
/**
* Iterator의 데이터를 발생시키는 Publisher
* @param iter 데이터 소스
* @return Publisher
*/
private static Publisher<Integer> iterPub(Iterable<Integer> iter) {
return new Publisher<>() {
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
// 이 예시 코드에선 Subscriber의 요청갯수인 'n' 인자에 대한 구현은 안되어있다.
try {
iter.forEach(subscriber::onNext);
subscriber.onComplete();
} catch (Exception e) {
subscriber.onError(e);
}
}
@Override
public void cancel() {
// Subscriber가 어떠한 경우로든 Publisher에게 데이터를 그만보내라고 요청하는것
// 그에 대비해 Publisher는 이 메소드안에 적절한 내용을 구현해두어야함
// 일반적으로 flag를 두고 request쪽에서 데이터를 더 보내지 않도록 함
}
});
}
};
}
/**
* Publisher에서 수신한 데이터를 출력하는 Subscriber
* @return Subscriber
*/
private static Subscriber<Integer> logSub() {
return new Subscriber<>() {
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("onSubscribe");
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext: " + integer);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError: " + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
}
}
아래 mapPub
함수에서 스트림의 흐름을 이해하기 쉽게 방향과 대조해보면
public class PubSub {
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, e -> e + 1).limit(10).collect(Collectors.toList()));
Publisher<Integer> mapPub = mapPub(pub, e -> e * 10); // 기존 값에 10을 곱하는 함수 전달
mapPub.subscribe(logSub());
}
private static Publisher<Integer> mapPub(Publisher<Integer> publisher, Function<Integer, Integer> mappingFunc) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
// 인자로 전달된 기존 publisher.subscribe의 인자에 새 Subscriber를 생성하여 넘겨준다.
// 이 때, 새 Subscriber에 기존 Subscriber의 기능들을 연결시키고, onNext에 데이터를 가공하는 기능만 부가적으로 추가해준다.
publisher.subscribe(new Subscriber<>() {
@Override
public void onSubscribe(Subscription subscription) {
subscriber.onSubscribe(subscription);
}
@Override
public void onNext(Integer integer) {
// 이전 퍼블리셔에서 발생된 데이터에 'mappingFunc' 적용하여 전달
subscriber.onNext(mappingFunc.apply(integer));
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
});
}
};
}
}
public class PubSub {
public static void main(String[] args) {
Publisher<Integer> pub = iterPub(Stream.iterate(1, e -> e + 1).limit(10).collect(Collectors.toList()));
Publisher<Integer> sumPub = sumPub(pub);
sumPub.subscribe(logSub());
}
private static Publisher<Integer> sumPub(Publisher<Integer> publisher, Function<Integer, Integer> mappingFunc) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
publisher.subscribe(new Subscriber<>() {
// 업스트림에서 전달되는 값을 합해둘 멤버변수 선언
private int sum = 0;
@Override
public void onSubscribe(Subscription subscription) {
subscriber.onSubscribe(subscription);
}
@Override
public void onNext(Integer integer) {
// 업스트림에서 데이터가 전달되면 바로 전달하지 않고 값을 더해둠
this.sum += integer;
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onComplete() {
// 업스트림에서 데이터 전달이 끝나면 더해둔 값을 다운스트림으로 보냄
subscriber.onNext(sum);
// 그리고 현재 스트림은 끝났음을 통지
subscriber.onComplete();
}
});
}
};
}
}
mapPub
메소드에 제네릭을 적용하면 아래와 같이 작성할 수 있다.private static <T, R> Publisher<R> mapPub(Publisher<T> publisher, Function<T, R> mappingFunc) {
return new Publisher<R>() {
@Override
public void subscribe(Subscriber<? super R> subscriber) {
publisher.subscribe(new Subscriber<T>() {
@Override
public void onSubscribe(Subscription subscription) {
subscriber.onSubscribe(subscription);
}
@Override
public void onNext(T integer) {
subscriber.onNext(mappingFunc.apply(integer));
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
});
}
};
}
디펜던시
implementation 'io.projectreactor:reactor-core:3.4.9'
위에 Publisher, Subscriber 직접 생성하고 연결하는 코드들은 모두 내부에 감춰져있고 아래와 같은 간단한 코드로 하나의 흐름을 만들수있다.
Flux.<Integer>create(e -> {
e.next(1);
e.next(2);
e.next(3);
e.complete();
})
.map(e -> e * 10)
.reduce(0, (a, b) -> a + b)
.subscribe(s -> System.out.println(s));