---
title: Echo Serverを実装して学ぶReactor Nettyによるストリーム処理
tags: ["Reactor", "Reactor Netty", "Netty", "Java"]
categories: ["Programming", "Java", "reactor", "ipc", "netty"]
date: 2017-10-10T15:34:25Z
updated: 2018-01-05T16:02:52Z
---

Reactor 3.1, Reactor Netty 0.7が正式に[リリースされた](https://spring.io/blog/2017/09/28/reactor-bismuth-is-out)ので、TCP Server版Hello WorldであるEcho ServerをReactor Nettyで実装してみます。

> ℹ️ 2018-01-04 Bismuth-SR4版 (Reactor 3.1.2, Reactor Netty 0.7.2)にアップデート

**目次**
<!-- toc -->

### 依存ライブラリの追加

Reactor 3.1.x系のバージョン管理は`reactor-bom`の`Bismuth`でメンテナンスされています。これを`<dependencyManagement>`に設定しておけば`<dependencies>`内でのバージョン明示は不要です。

```xml
    <dependencies>
        <dependency>
            <groupId>io.projectreactor.ipc</groupId>
            <artifactId>reactor-netty</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>Bismuth-SR4</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
```


### Echo Serverを実装

Reactor Nettyの`TcpServer`の使い方のテンプレートは次の通りです。

``` java
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.tcp.TcpServer;

import java.util.Optional;
import java.util.function.BiFunction;

public class EchoServer {
    public static void main(String[] args) {
        TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777);
        tcpServer.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
            @Override
            public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
                // ここに入力 => 出力の処理を書く
                return Flux.never();
            }
        });
    }
}
```

サーバーサイドプログラミングに慣れていれば、"ああメッセージの処理ハンドラをラムダで書けばいいんだな"と思うかもしれません。
これはある意味合っているのですが、通常の"処理ハンドラ"と大きく違うのは、このハンドラ(`BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>`)が扱うのは1件のメッセージではなく、無限ストリームであるという点です。
引数の`NettyInbound`/`NettyOutbound`はServletの`HttpServletRequest`/`HttpServletResponse`のように1リクエスト/レスポンスではなく、このサーバーへの入出力全体を表します。なので、この処理ハンドラの返り値が出力ストリームになるわけでもありません。処理が終わらないように`Flux.never()`(データは流れないけど終わらないストリーム)を返します。

入力として扱うデータを文字列に絞りましょう。この場合、入力となる文字列の無限ストリームを`Flux`で次のように取得できます。

``` java
        tcpServer.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
            @Override
            public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
                Flux<String> input = inbound.receive().asString();

                return Flux.never();
            }
        });
```

この入力ストリームを加工して出力ストリームに送ります。
ここで通常のサーバープログラミングのように、1件ずつ処理したいと考えます。`Flux`クラスにはデータ1件ごとのコールバックメソッドとして`doOnNext(Consumer<T>)`が用意されています。これを使うとサーバーサイドのコードは次のようになります。（わかりやすいようにラムダ式を使っていません）

```java
        tcpServer.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
            @Override
            public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
                Flux<String> input = inbound.receive().asString();

                input.doOnNext(new Consumer<String>() {
                    @Override
                    public void accept(String message) {
                      // 文字列メッセージ一件ずつの処理
                    }
                }).subscribe();

                return Flux.never();
            }
        });
```

注意点として、入力ストリームの`Flux`は`subscribe`メソッドを呼ばないとストリームにデータが流れません。

出力ストリームにこのメッセージを送信するコードは次の通りです。

```java
        tcpServer.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
            @Override
            public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
                Flux<String> input = inbound.receive().asString();

                input.doOnNext(new Consumer<String>() {
                    @Override
                    public void accept(String message) {
                        outbound.sendString(Mono.just(message))
                                .then()
                                .subscribe();
                    }
                }).subscribe();

                return Flux.never();
            }
        });
```

`NettyOutbound.sendString`は引数が`Publisher<String>`なので、`String`をメッセージを`Mono.just`で包んでいます。この返り値が出力ストリームになります。この型も`NettyOutbound`です。これは`Publisher<Void>`を継承しています。

入力ストリーム同様に出力ストリームも`subscribe`しないとデータが流れません。
ここでは`then`メソッドで`Mono`に変換して`subscribe`します。

ここまででEcho Serverができました。`main`メソッドを実行してサーバーを起動時、`telnet`でTCPサーバーにアクセスします。

```
$ telnet localhost 7777
Trying ::1...
Connected to localhost.
Escape character is '^]'.
hoge <== 入力
hoge ==> 出力
foo <== 入力
foo ==> 出力
bar <== 入力
bar ==> 出力
^]
telnet> quit
Connection closed.
```

`nc`コマンドでも動作確認できます。

```
$ echo -n 'Hello World!' | nc localhost 7777
Hello World!
```

上記のプログラムはラムダ式を使うと次のように書けます。

``` java
    public static void main(String[] args) {
        TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777);
        tcpServer.startAndAwait((inbound, outbound) -> {
            Flux<String> input = inbound.receive().asString();
            input.doOnNext(message -> outbound.sendString(Mono.just(message))
                    .then()
                    .subscribe()).subscribe();
            return Flux.never();
        });
    }
```

実際にメッセージが出力ストリームに流れているかどうかを確認したい場合は、次のように出力メッセージのストリームに`log`メソッドを追加します。

``` java
    public static void main(String[] args) {
        TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777);
        tcpServer.startAndAwait((inbound, outbound) -> {
            Flux<String> input = inbound.receive().asString();
            input.doOnNext(message -> outbound.sendString(Mono.just(message).log() /*追加*/)
                    .then()
                    .subscribe()).subscribe();
            return Flux.never();
        });
    }
```

これで再起動し、再度メッセージを送ると、次のようなログを確認できます。`onNext`で出力メッセージが流れていることがわかります。

```
[ INFO] (reactor-tcp-nio-2) | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[ INFO] (reactor-tcp-nio-2) | request(32)
[ INFO] (reactor-tcp-nio-2) | onNext(Hello World!)
[TRACE] (reactor-tcp-nio-2) [id: 0x8984720e, L:/0:0:0:0:0:0:0:1:7777 - R:/0:0:0:0:0:0:0:1:52269] Pending write size = 12
[ INFO] (reactor-tcp-nio-2) | request(1)
[ INFO] (reactor-tcp-nio-2) | onComplete()
```

出力側の`subscribe`を削除すると`onNext`ログがでないことを確認できるでしょう。

少し、リファクタしてみます。

上記のコードは入力ストリームの`doOnNext`コールバック内で出力ストリームを作成したため、両方の`subscribe`が必要でした。
ここで考え方を変えて、入力ストリームを変換して出力ストリームを作成するようにすると、作成されたストリームを`subscribe`するだけで入出力両方にデータが流れます。変換したストリームと元のストリームを合流させるには`flatMap`を使います。`flatMap`を使えばEchoServerは次のように書き換えられます。

``` java
    public static void main(String[] args) {
        TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777);
        tcpServer.startAndAwait((inbound, outbound) -> {
            Flux<String> input = inbound.receive().asString();
            input.flatMap(message -> outbound.sendString(Mono.just(message)))
                    .subscribe();
            return Flux.never();
        });
    }
```

見た目がすっきりしました。

> ちなみに、このコードで`flatMap`を`map`に変えてもコンパイルは通りますが、データは流れなくなります。この違い、とても重要なので理解してください。



### Echo Serverのバッファリング処理

ただのEcho Serverだとつまらないので、もう少しストリーム処理っぽい内容に変えてみます。

`NettyOutbound.sendString`の引数は`Publisher<String>`でした。つまりこのメソッドはメッセージを1件送るためのメソッドではなく、メッセージのストリームを送るメソッドです。

ここでは、EchoServerを3件ごとバッファリングして一気に出力するように書き換えます。`Flux<String>`のストリームを3件ずつの塊ストリームである`Flux<Flux<String>>`に変換するには`window(int)`メソッドを使用します。

``` java
    public static void main(String[] args) {
        TcpServer tcpServer = TcpServer.create("0.0.0.0", 7777);
        tcpServer.startAndAwait((inbound, outbound) -> {
            Flux<String> input = inbound.receive().asString();
            input.window(3)
                    .flatMap(messages -> outbound.sendString(messages))
                    .subscribe();
            return Flux.never();
        });
    }
```

これで3件ずつまとめてデータを扱うEchoServerになりました。
`telnet`で動作確認してみます。

```
$ telnet localhost 7777
Trying ::1...
Connected to localhost.
Escape character is '^]'.
hoge <== 入力
foo <== 入力
bar <== 入力
hoge ==> 出力
foo ==> 出力
bar ==> 出力
^]
telnet> quit
Connection closed.
```

前の例と異なり、3件単位でデータが流れていることがわかります。

---

Echo Serverを実装を通じてReactor Nettyを使ったTCP Serverの作り方及び、ストリームを扱うプログラミングの考え方を簡単に学びました。
Reactorを使うことでこれまで使ってこなかったストリーム脳で考えないといけない場面が増えてくるので、この記事がとっかかりになればと思います。
