覚えたら書く

IT関係のデベロッパとして日々覚えたことを書き残したいです。twitter: @yyoshikaw

RxJava - RxJavaでHelloWorld

RxJavaを使いながらリアクティブプログラミングを理解していきたい。

とりあえずは、まず何を置いてもHello World!


準備

pom.xmlに以下の依存関係を追加します。Reactive Streamsにも対応しているバージョン2.Xを使います

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.0.8</version>
</dependency>


サンプルの実行

■サンプルコード

以下は、ほぼ「RxJavaリアクティブプログラミング (CodeZine BOOKS) 」からの写経です。

Hello, World!-Xの文字列を順に発行して、購読側のonNextで受け取れることを確認しました

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.schedulers.Schedulers;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.Arrays;
import java.util.List;

public class HelloWorldRx {

    public static void main(String[] args) throws Exception {
        // 発行するデータ
        final List<String> helloList = Arrays.asList("Hello, World!-1", "Hello, World!-2", "Hello, World!-3");

        Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {

            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                helloList.forEach(s -> emitter.onNext(s));

                emitter.onComplete();
            }
        }, BackpressureStrategy.BUFFER);


        flowable.observeOn(Schedulers.computation())
            .subscribe(new Subscriber<String>() {

                private Subscription subscription;

                @Override
                public void onSubscribe(Subscription subscription) {
                    this.subscription = subscription;
                    this.subscription.request(1);
                }

                @Override
                public void onNext(String data) {
                    String threadName = Thread.currentThread().getName();
                    System.out.println(threadName + " onNext -> " + data);

                    this.subscription.request(1);
                }

                @Override
                public void onComplete() {
                    String threadName = Thread.currentThread().getName();
                    System.out.println(threadName + " onComplete");
                }

                @Override
                public void onError(Throwable throwable) {
                    throwable.printStackTrace();
                }
         });

        Thread.sleep(500);
    }
}

■実行結果

RxComputationThreadPool-1 onNext -> Hello, World!-1
RxComputationThreadPool-1 onNext -> Hello, World!-2
RxComputationThreadPool-1 onNext -> Hello, World!-3
RxComputationThreadPool-1 onComplete


もっとシンプルに試すだけなら以下のコードとかでもいいかもしれないですね

■サンプルコード

import io.reactivex.Flowable;

public class HelloWorldRx2 {

    public static void main(String[] args) throws Exception {
        Flowable<String> flowable =
                Flowable.just("Hello, World!-1", "Hello, World!-2", "Hello, World!-3");

        flowable.subscribe(data -> System.out.println(data));

        Thread.sleep(500);
    }
}

■実行結果

Hello, World!-1
Hello, World!-2
Hello, World!-3


とりあえず、RxJavaを使ったHello Worldができました



関連書籍

RxJavaリアクティブプログラミング (CodeZine BOOKS)

RxJavaリアクティブプログラミング (CodeZine BOOKS)