---
title: Echo Clientを実装して学ぶReactor NettyのTCP Client
tags: ["Reactor", "Reactor Netty", "Netty", "Java"]
categories: ["Programming", "Java", "reactor", "ipc", "netty"]
date: 2018-01-05T19:43:12Z
updated: 2018-01-17T15:32:05Z
---

[前回](/entries/438)、Echo Serverを実装してReactor NettyでTCP Serverを作成する方法を学びました。
今回はクライアント編です。


**目次**
<!-- toc -->

### 依存ライブラリの追加

依存ライブラリはServerと同じです。
Reactor 3.1.x系のバージョン管理は`reactor-bom`の`Bismuth`でメンテナンスされており、これを`<dependencyManagement>`に設定しておけば`<dependencies>`内でのバージョン明示は不要です。

```xml
    <dependencies>
        <dependency>
            <groupId>io.projectreactor.ipc</groupId>
            <artifactId>reactor-netty</artifactId>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>Bismuth-SR4</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
```

### Echo Serverを実装

Reactor Nettyの`TcpCliet`の使い方のテンプレートは次の通りで、`TcpServer`と基本的には同じです。


``` java
import org.reactivestreams.Publisher;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.tcp.TcpClient;

import java.util.function.BiFunction;

public class EchoClient {

    public static void main(String[] args) throws Exception {
        TcpClient client = TcpClient.create("localhost", 7777);
        client.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
            @Override
            public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
                return null /* Responseが終了したら完了するPublisher<Void> */;
            }
        });
    }
}
```

ただし、Serverの場合、

`NettyInbound` = Request (サーバーから見てリクエスト受信) , `NettyOutbound` = Response (サーバーから見てレスポンス送信)

だったのが、Clientの場合は、

`NettyInbound` = Response (クライアントから見てレスポンス受信), `NettyOutbound` = Request (クライアントから見てリクエスト送信)

になります。

また、今回はEcho Clientとして、1リクエストを送ると1レスポンスが返って終了するという挙動を期待します。ですので、`startAndAwait`メソッドに渡すハンドラの返り値は
Serverの時のように`Flux.never()`で終了しないストリームを返すのではなく、レスポンスを受け取ったらたら完了するストリームを返すようにします。

> サーバーにつなぎっぱなしのクライアントを実装したい場合は、終わらないストリームを返します。


まずは、リクエストを送信します。今回は1件だけデータを送ることを想定するので`Mono<String>`でデータを表現します。
データの送信には`NettyOutbound.send`や`NettyOutbound.sendString`メソッドが用意されています。返り値は`NettyOutbound`で、この型は`Publisher<Void>`を継承しています。

``` java
client.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
    @Override
    public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
        Mono<String> input = Mono.just("Hello World!");
        NettyOutbound send = outbound.sendString(input);
        return null /* Responseが終了したら完了するPublisher<Void> */;
    }
});
```

当然、`subscribe`しないと実際にストリームは流れませんが、`startAndAwait`に渡したハンドラの実行結果の`Publisher<Void>`は自動で`subscribe`されます。
すなわち、次のように書くと、サーバーでデータを送信し、送信完了すると終了するクライアント処理ができます。

``` java
client.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
    @Override
    public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
        Mono<String> input = Mono.just("Hello World!");
        NettyOutbound send = outbound.sendString(input);
        return send;
    }
});
```

実行すると次のようなログが出力されます。

```
[DEBUG] (main) Using Console logging
[DEBUG] (main) Default epoll support : false
[DEBUG] (main) New tcp client pool for localhost:7777
[DEBUG] (main) Acquiring existing channel from pool: DefaultPromise@4ba2ca36(incomplete) SimpleChannelPool{activeConnections=1}
[DEBUG] (reactor-tcp-nio-4) Created [id: 0xcc0c8586], now 1 active connections
[DEBUG] (reactor-tcp-nio-4) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)}
[DEBUG] (reactor-tcp-nio-4) Acquired active channel: [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777]
[DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] handler is being applied: com.example.demotcpclient.EchoClient$1@3d20d50c
[DEBUG] (reactor-tcp-nio-4) [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] Writing object FluxMapFuseable
[TRACE] (reactor-tcp-nio-4) [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] Pending write size = 12
[DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] User Handler requesting close connection
[TRACE] (reactor-tcp-nio-4) [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] Disposing ChannelOperation from a channel
[ INFO] (reactor-tcp-nio-4) Started TcpClient on localhost/127.0.0.1:7777
[TRACE] (reactor-tcp-nio-4) [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] End of the pipeline, User event [Handler Terminated]
[DEBUG] (reactor-tcp-nio-4) [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777] Disposing context reactor.ipc.netty.channel.PooledClientContextHandler@673e0aa6
[DEBUG] (reactor-tcp-nio-4) Releasing channel: [id: 0xcc0c8586, L:/127.0.0.1:63736 - R:localhost/127.0.0.1:7777]
```

Echo Server側を見ると`Hello World!`が出力されるでしょう。


> ちなみに、`startAndAwait`メソッドは名前の通りブロッキングなメソッドです。ハンドラの実行結果の`Publisher`を`subscribe`して完了するまでブロックします。自分で制御したい場合は次のように書けます。
> 
> ``` java
> Mono<? extends NettyContext> handler = client.newHandler(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() { /* ... */ });
> handler.subscribe();
> ```


サーバーからのレスポンスも取得したいので、`send`が終わったらデータの受信する処理(`inbond.receive()`)をつなげます。処理をつなげる場合は`then`メソッドが使えます。

``` java
client.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
    @Override
    public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
        Mono<String> input = Mono.just("Hello World!");
        NettyOutbound send = outbound.sendString(input);
        return send.then(inbound.receive().then());
    }
});
```

`NettyOutbound.then`の引数は`Publisher<Void>`でないといけないので、`receive()`の結果に対して更に`then()`を実行します。

受信したデータを`asString()`メソッドで`Flux<String>`に変換し、標準出力に表示してみます。

``` java
client.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
    @Override
    public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
        Mono<String> input = Mono.just("Hello World!");
        NettyOutbound send = outbound.sendString(input);
        return send.then(inbound.receive()
                                .asString()
                                .doOnNext(s -> System.out.println("Received => " + s))
                                .then());
    }
});
```

実行すると次のようなログが出力されます。

```
[DEBUG] (main) Using Console logging
[DEBUG] (main) Default epoll support : false
[DEBUG] (main) New tcp client pool for localhost:7777
[DEBUG] (main) Acquiring existing channel from pool: DefaultPromise@4ba2ca36(incomplete) SimpleChannelPool{activeConnections=1}
[DEBUG] (reactor-tcp-nio-4) Created [id: 0x54e9a483], now 1 active connections
[DEBUG] (reactor-tcp-nio-4) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)}
[DEBUG] (reactor-tcp-nio-4) Acquired active channel: [id: 0x54e9a483, L:/127.0.0.1:63727 - R:localhost/127.0.0.1:7777]
[DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0x54e9a483, L:/127.0.0.1:63727 - R:localhost/127.0.0.1:7777] handler is being applied: com.example.demotcpclient.EchoClient$1@34d84ada
[DEBUG] (reactor-tcp-nio-4) [id: 0x54e9a483, L:/127.0.0.1:63727 - R:localhost/127.0.0.1:7777] Writing object FluxMapFuseable
[TRACE] (reactor-tcp-nio-4) [id: 0x54e9a483, L:/127.0.0.1:63727 - R:localhost/127.0.0.1:7777] Pending write size = 12
[DEBUG] (reactor-tcp-nio-4) [id: 0x54e9a483, L:/127.0.0.1:63727 - R:localhost/127.0.0.1:7777] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
[ INFO] (reactor-tcp-nio-4) Started TcpClient on localhost/127.0.0.1:7777
Received => Hello World!
```

一見うまくいっているように見えますが、実は問題があります。プログラムが終了していません。
わかりやすいように`log`メソッドを追加します。

``` java
client.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
    @Override
    public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
        Mono<String> input = Mono.just("Hello World!");
        NettyOutbound send = outbound.sendString(input);
        return send.then(inbound.receive()
                                .asString()
                                .log() // added
                                .doOnNext(s -> System.out.println("Received => " + s))
                                .then());
    }
});
```

この状態で実行すると、次のようなログが出力されます。


```
[DEBUG] (main) Using Console logging
[DEBUG] (main) Default epoll support : false
[DEBUG] (main) New tcp client pool for localhost:7777
[DEBUG] (main) Acquiring existing channel from pool: DefaultPromise@4ba2ca36(incomplete) SimpleChannelPool{activeConnections=1}
[DEBUG] (reactor-tcp-nio-4) Created [id: 0xd4aa0cdf], now 1 active connections
[DEBUG] (reactor-tcp-nio-4) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)}
[DEBUG] (reactor-tcp-nio-4) Acquired active channel: [id: 0xd4aa0cdf, L:/127.0.0.1:63777 - R:localhost/127.0.0.1:7777]
[DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0xd4aa0cdf, L:/127.0.0.1:63777 - R:localhost/127.0.0.1:7777] handler is being applied: com.example.demotcpclient.EchoClient$1@6ffe96da
[DEBUG] (reactor-tcp-nio-4) [id: 0xd4aa0cdf, L:/127.0.0.1:63777 - R:localhost/127.0.0.1:7777] Writing object FluxMapFuseable
[TRACE] (reactor-tcp-nio-4) [id: 0xd4aa0cdf, L:/127.0.0.1:63777 - R:localhost/127.0.0.1:7777] Pending write size = 12
[DEBUG] (reactor-tcp-nio-4) [id: 0xd4aa0cdf, L:/127.0.0.1:63777 - R:localhost/127.0.0.1:7777] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
[ INFO] (reactor-tcp-nio-4) onSubscribe(FluxMap.MapSubscriber)
[ INFO] (reactor-tcp-nio-4) request(unbounded)
[ INFO] (reactor-tcp-nio-4) Started TcpClient on localhost/127.0.0.1:7777
[ INFO] (reactor-tcp-nio-4) onNext(Hello World!)
Received => Hello World!
```

確かにデータは受信できているのですが、`onComplete`が呼ばれていません。そのため、ハンドラーの返り値のストリームはまだ受信するデータがあるかもしれないから終わらない、という状態です。
どのタイミングで終了したいかはプロトコル次第ですが、今回は一件データが取得できればそこで終了したいです。
なので、`Flux`のデータを一件だけ受け取ったら終了する`Mono`に変換すれば良いです。この用途にぴったりのメソッドが[`Flux.next()`](https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#next--)です。


![](https://raw.githubusercontent.com/reactor/reactor-core/v3.1.2.RELEASE/src/docs/marble/next.png)

`asString()`の後に`next()`を追加します。

``` java
client.startAndAwait(new BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>() {
    @Override
    public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
        Mono<String> input = Mono.just("Hello World!");
        NettyOutbound send = outbound.sendString(input);
        return send.then(inbound.receive()
                                .asString()
                                .next() // added
                                .log()
                                .doOnNext(s -> System.out.println("Received => " + s))
                                .then());
    }
});
```

この状態で実行すると、次のようなログが出力されます。


```
[DEBUG] (main) Using Console logging
[DEBUG] (main) Default epoll support : false
[DEBUG] (main) New tcp client pool for localhost:7777
[DEBUG] (main) Acquiring existing channel from pool: DefaultPromise@4ba2ca36(incomplete) SimpleChannelPool{activeConnections=1}
[DEBUG] (reactor-tcp-nio-4) Created [id: 0x778cbeac], now 1 active connections
[DEBUG] (reactor-tcp-nio-4) After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler), (SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1), (reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)}
[DEBUG] (reactor-tcp-nio-4) Acquired active channel: [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777]
[DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] handler is being applied: com.example.demotcpclient.EchoClient$1@34d84ada
[DEBUG] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] Writing object FluxMapFuseable
[TRACE] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] Pending write size = 12
[DEBUG] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
[ INFO] (reactor-tcp-nio-4) onSubscribe(MonoNext.NextSubscriber)
[ INFO] (reactor-tcp-nio-4) request(unbounded)
[ INFO] (reactor-tcp-nio-4) Started TcpClient on localhost/127.0.0.1:7777
[ INFO] (reactor-tcp-nio-4) onNext(Hello World!)
Received => Hello World!
[ INFO] (reactor-tcp-nio-4) onComplete()
[DEBUG] (reactor-tcp-nio-4) [Channel] [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] User Handler requesting close connection
[TRACE] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] Disposing ChannelOperation from a channel
[TRACE] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] End of the pipeline, User event [Handler Terminated]
[DEBUG] (reactor-tcp-nio-4) [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777] Disposing context reactor.ipc.netty.channel.PooledClientContextHandler@685b8b20
[DEBUG] (reactor-tcp-nio-4) Releasing channel: [id: 0x778cbeac, L:/127.0.0.1:63821 - R:localhost/127.0.0.1:7777]
```

`onComplete`が呼ばれ、無事プログラムが終了しました。

ラムダ式を使えば次のようにスッキリします。

``` java
client.startAndAwait((inbound, outbound) -> {
    Mono<String> input = Mono.just("Hello World!");
    NettyOutbound send = outbound.sendString(input);
    return send.then(inbound.receive()
                            .asString()
                            .next()
                            .log()
                            .doOnNext(s -> System.out.println("Received => " + s))
                            .then());
});
```

ほぼOKなのですが、TCP Clientを作ったならば、返り値も`Mono<String>`で受けたいです。この場合は、次のように`Mono.create`メソッドを使用します。


``` java
Mono<String> result = Mono.create(sink ->
        client.startAndAwait((inbound, outbound) -> {
            Mono<String> input = Mono.just("Hello World!");
            NettyOutbound send = outbound.sendString(input);
            return send.then(inbound.receive()
                    .asString()
                    .next()
                    .log()
                    .doOnNext(sink::success)
                    .doOnError(sink::error)
                    .then());
        }));
```

まとめると、`EchoClient`クラスを次のように書けます。

``` java
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.tcp.TcpClient;

public class EchoClient {
    private final TcpClient client;

    public EchoClient(String host, int port) {
        this.client = TcpClient.create(host, port);
    }

    public Mono<String> echo(Mono<String> input) {
        return Mono.create(sink ->
                client.startAndAwait((inbound, outbound) -> outbound
                        .sendString(input)
                        .then(inbound.receive()
                                .asString()
                                .next()
                                .doOnNext(sink::success)
                                .doOnError(sink::error)
                                .then())));
    }
}
```

これでNon-BlockingなEcho Clientが出来上がりました。

と、思ったけれど、よく見ると`startAndAwait`はブロッキングなメソッドなので、このままではダメですね。`Mono.create`の中では`start`で止めて、後始末をコールバックで書くのが良いでしょう。

``` java
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.tcp.BlockingNettyContext;
import reactor.ipc.netty.tcp.TcpClient;

public class EchoClient {
    private final TcpClient client;

    public EchoClient(String host, int port) {
        this.client = TcpClient.create(host, port);
    }

    public Mono<String> echo(Mono<String> input) {
        return Mono.create(sink -> {
            BlockingNettyContext context = client.start((inbound, outbound) -> outbound
                    .sendString(input)
                    .then(inbound.receive()
                            .asString()
                            .next()
                            .doOnNext(sink::success)
                            .doOnError(sink::error)
                            .then()));
            sink.onDispose(() -> context.getContext().dispose());
        });
    }
}
```


これで`EchoClient`はOKでしょう。次のように利用できます。

``` java
EchoClient echoClient = new EchoClient("localhost", 7777);
Mono<String> result = echoClient.echo(Mono.just("Hello World!"));
result.log()
        .doOnNext(s -> System.out.println("Received => " + s))
        .subscribe();
// mainメソッドで実行する場合はThread.sleep(1000);などして、プログラムが先に終了しないようにブロックする必要あり。
```

### 無限ストリームに対応


では、この`EchoClient`で無限ストリームに対応するために、`Flux`に対応するとどうなるでしょうか。

``` java
public Flux<String> echo(Flux<String> input) {
    // ....
}
```

次の呼び出しに対して、

``` java
Flux<String> input = Flux.interval(Duration.ofMillis(500)).map(i -> "Hi " + i);
Flux<String> result = echoClient.echo(input);
result.take(10)
        .log()
        .doOnNext(s -> System.out.println("Received => " + s))
        .subscribe();
```

次のように出力して欲しいです。

```
Received => Hi 0
Received => Hi 1
Received => Hi 2
Received => Hi 3
Received => Hi 4
Received => Hi 5
Received => Hi 6
Received => Hi 7
Received => Hi 8
Received => Hi 9
```

考えてみてください。
