---
title: Spring Cloud Streamでマイクロサービス間メッセージング
tags: ["Spring Boot", "Spring Cloud", "Spring Cloud Stream"]
categories: ["Programming", "Java", "org", "springframework", "cloud", "stream"]
date: 2016-01-06T09:18:59Z
updated: 2016-12-11T14:48:03Z
---

[Spring Cloud Stream](http://cloud.spring.io/spring-cloud-stream/)とはマイクロサービス間のメッセージングを簡単に実現するためのプロジェクトです。
Spring Integrationのマイクロサービス版のような位置付けです。

執筆時点のバージョンは`1.0.0.RELEASE`です。

下図のようなSource(データの送り元)、 Sink(データの受け皿)とよばれるモジュールとそれらをつなぐBinderから成ります。

![image](https://qiita-image-store.s3.amazonaws.com/0/1852/015157ad-dc48-5d53-a869-6174f077b32b.png)

このモジュールが1つのマイクロサービスにあたり、Spring Bootアプリケーションとなります。

また、Binderにはメッセージキューが使用され、[Kafka](http://kafka.apache.org/)、[RabbitMQ](https://www.rabbitmq.com/)、<s>[Redis](http://redis.io/)</s>(Redisは1.0.0には含まれていません)から選べます。

Unixのパイプのように表現すると、

    source | sink

となります。`|`がBinderにあたります。

Source兼SinkであるProcessorも用意されており、

    source | processor | processor | sink

というような使い方もできます。

Spring XDを使ったことがあれば、このあたりの用語に聞き覚えがあると思います。
このプロジェクトができた背景としてはSpring XDのマイクロサービス対応リファクタリング([Sprin Cloud Data Flow](http://cloud.spring.io/spring-cloud-dataflow/))があるのですが、これはまたいつか説明します。

### Spring Cloud Streamを使ってみる

習うより慣れろ、で、まずは動かしてみましょう。
BinderとしてKafkaを使います。

#### Kafkaの準備
ここではDockerでKafkaを立ち上げます。(開発用にローカルマシンを汚したくないため)
ローカルのKafkaを使っても勿論構わないので、その場合はホスト名を`localhost`にして読み替えてください。

``` console
($ docker-machine create dev --provider virtualbox) <-- unless docker machine is set up
$ docker run -d --name kafka -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip dev` --env ADVERTISED_PORT=9092 spotify/kafka
$ docker-machine ssh dev -f -N -L 9092:localhost:9092 # port forwarding for Kafka
$ docker-machine ssh dev -f -N -L 2181:localhost:2181 # port forwarding for ZooKeeper
```

#### Sourceの作成

[Spring Initializr](https://start.spring.io)でSource側のプロジェクトを作成しましょう。

「Search for dependencies」に"Stream Kafka"を入力し、エンターを入力してください。
「Artifact」に"demo-source"を入力して、「Generate Project」をクリック。

![image](https://qiita-image-store.s3.amazonaws.com/0/1852/61bf08d7-ef19-f378-45a1-79444857f923.png)

Mavenプロジェクトがzipでダウンロードできるので、zipを展開して、IDEにインポートしてください。
`DemoSourceApplication.java`にコードを追加して、Sourceのモジュールを作成します。

Sourceとして認識させるために、`org.springframework.cloud.stream.annotation.EnableBinding`アノテーションに`org.springframework.cloud.stream.messaging.Source`クラスを指定します[1]。

``` java
@SpringBootApplication
@EnableBinding(Source.class) // [1]
public class DemoSourceApplication {
   // ...
}

```

ちなみに、`Source`クラスは次のような実装になっています。

``` java
package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface Source {
	
	String OUTPUT = "output";
	
	@Output(Source.OUTPUT)
	MessageChannel output();

}
```

Sourceからメッセージを送信する方法は、いろいろありますが、最も直感的なのは
`Source#output()`メソッドで得られる`MessageChannel`に`org.springframework.messaging.Message`オブジェクトを送る方法です。

次のコードでメッセージを送信する簡単な例を示します。

``` java
package com.example;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@EnableBinding(Source.class)
@RestController
public class DemoSourceApplication {

    @Autowired
    Source source; // [2]

    @RequestMapping(path = "message")
    void greet(@RequestBody String message) { // [3]
        source.output().send(MessageBuilder.withPayload(message).build()); // [4]
    }

    public static void main(String[] args) {
        SpringApplication.run(DemoSourceApplication.class, args);
    }
}
```

バインドされた`Source`オブジェクトはインジェクション可能です[2]。
この`Source`オブジェクトを使う処理をSpring MVCのHTTPエンドポイント内に実装します[3]。
`Message`は`MessageBuilder`で作成するのが簡単です[4]。作成した`Message`を`MessageChannel#send`メソッドで送信します。

ちなみに、`Message`クラスはSpring Framework本体に含まれているメッセージ抽象化インターフェースであり、Springの色々なプロジェクトで使用されています。

<iframe src="//www.slideshare.net/slideshow/embed_code/key/fn23uZwM4Cvvfj?startSlide=51" width="595" height="485" frameborder="0" marginwidth="0" marginheight="0" scrolling="no" style="border:1px solid #CCC; border-width:1px; margin-bottom:5px; max-width: 100%;" allowfullscreen> </iframe> <div style="margin-bottom:5px"> <strong> <a href="//www.slideshare.net/makingx/springone-2gx-2014-spring-41-jsug/51" title="SpringOne 2GX 2014 参加報告 &amp; Spring 4.1について #jsug" target="_blank">SpringOne 2GX 2014 参加報告 &amp; Spring 4.1について #jsug</a> </strong> from <strong><a href="//www.slideshare.net/makingx" target="_blank">Toshiaki Maki</a></strong> </div>
`DemoSourceApplication`を実行する前に、`application.properties`にいくつか設定を行います。

``` properties
# [6]
spring.cloud.stream.bindings.output.destination=demo
# [7]
server.port=9000
```

メッセージの宛先名は`spring.cloud.stream.bindings.output.destination`プロパティに設定します[6]。
[7]のサーバーポート番号の指定は必須ではないですが、このあとSinkモジュールも起動するので、重複しないように今のうちに変えておきます。

では`DemoSourceApplication`を実行してください。

`kafkacat`コマンドでメッセージを確認しましょう。(`brew install kafkacat`でインストールできます)

``` console
$ kafkacat -C -b localhost:9092 -t demo
```

Sourceにリクエストを送ると、

``` console
$ curl -H "Content-Type: text/plain" -d Hello localhost:9000/message
```

`kafkacat`側には以下のような出力が表示されます。

``` console
?
 contentType
            "text/plain"Hello
```


Sinkがまだできていないため、送られたメッセージは処理されません。
次にSinkを作成しましょう。

Sourceからメッセージを送信する他の方法はSinkを作成した後に紹介します。

#### Sinkの作成
続いて、Sink側のプロジェクトを作成しましょう。Source同様に[Spring Initializr](https://start.spring.io)で、「Search for dependencies」に"Stream Kafka"を入力し、エンターを入力してください。
「Artifact」に"demo-sink"を入力して、「Generate Project」をクリック

![image](https://qiita-image-store.s3.amazonaws.com/0/1852/edafe2d7-d372-8caf-e557-918eff87ede6.png)

ダウンロードしたzipファイルを展開し、IDEにインポートします。

今度はSinkとして認識させるために、`EnableBinding`アノテーションに`org.springframework.cloud.stream.messaging.Sink`クラスを指定します[8]。



``` java
@SpringBootApplication
@EnableBinding(Sink.class) // [8]
public class DemoSinkApplication {
    // ...
}

```

`Sink`クラスは`Source`クラス同様に次のような実装になっています。

``` java
package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface Sink {

	String INPUT = "input";

	@Input(Sink.INPUT)
	SubscribableChannel input();

}
```

`Source`から送られるメッセージを処理する方法も色々あるのですが、
一番簡単な`org.springframework.cloud.stream.annotation.StreamListener`アノテーションをメソッドにつける方法を紹介します。


``` java
package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;

@SpringBootApplication
@EnableBinding(Sink.class)
public class DemoSinkApplication {

    @StreamListener(Sink.INPUT) // [10]
    public void log(Message<String> message) { // [11]
        System.out.println("received " + message);
    }

    public static void main(String[] args) {
        SpringApplication.run(DemoSinkApplication.class, args);
    }
}

```

メッセージを処理するメソッドに`@StreamListener`アノテーションをつけます。`value`属性には`Sink.INPUT`を指定します[10]。
メソッドの引数には`Message`をとります[11]。


`@StreamListener`メソッドの引数は`Message`クラスではなく、メッセージの中身(Payload)だけとることもできます[12]。この場合はSpringのAPIを明示的に使わなくてもSinkモジュールを作成できます。

``` java
    @StreamListener(Sink.INPUT) // [10]
    public void log(String message) { // [11]
        System.out.println("received " + message);
    }
```

引数に`@Header("ヘッダー名")`を付ければヘッダーの値を取ることもできます。


`application.properties`にSource同様、メッセージの宛先名[14]、アプリケーションのポート番号[16]を設定します。`spring.cloud.stream.bindings.output.destination`と`spring.cloud.stream.bindings.input.destination`の値は同じでないと繋がりません。Consumer Group名`hello`を`spring.cloud.stream.bindings.input.group`に指定します[15]。この値を設定しないと、同じSinkをスケールアウトした際に、同じメッセージが全てのノードに流れてしまいます。

``` properties
# [14]
spring.cloud.stream.bindings.input.destination=demo
# [15]
spring.cloud.stream.bindings.input.group=hello
# [16]
server.port=9001
```

`DemoSinkApplication`を実行し、Source側にリクエストを送ると、

``` console
$ curl -H "Content-Type: text/plain" -d Spring localhost:9000/message
```

Sink側にログが出力されます。

``` console
received GenericMessage [payload=Spring, headers={kafka_offset=2, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=16, contentType=text/plain}]
```

これで`source | sink`ができました。

Processorを挟む例はまた今度。基本的には

``` java
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
```

をつけて、一度受信した後にまた次へ送信するモジュールを挟む感じです。

サンプルは[こちら](https://github.com/spring-cloud/spring-cloud-stream/tree/master/spring-cloud-stream-samples/transform)。

### Sourceの修正

Source側でメッセージを送信する別の方法をみてみましょう。先ほどは`MessageChannel`を使って任意のタイミングでメッセージを送信しましたが、`@InboundChannelAdapter`アノテーションを使ってメッセージ生成方法を指定する方法があります。

`Message`オブジェクトを生成するための`org.springframework.integration.core.MessageSource`に`@InboundChannelAdapter`をつけてBean定義すれば、定期的に`MessageSource#receive`メソッドが呼ばれ、生成された`Message`が送信されます。
(`receive`というメソッド名に違和感がありますが、[javadoc](http://docs.spring.io/spring-integration/api/org/springframework/integration/core/MessageSource.html#receive--)にはSourceから次のメッセージを取り出すためのメソッドと書かれています。)

`DemoSourceApplication`を次のように書き換えます。

``` java
package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

@SpringBootApplication
@EnableBinding(Source.class)
public class DemoSourceApplication {

    @Bean
    @InboundChannelAdapter(value= Source.OUTPUT, poller= @Poller(fixedRate = "1000", maxMessagesPerPoll= "1")) // [16]
    MessageSource<String> source() {
        return new MessageSource<String>() {
            @Override
            public Message<String> receive() {
                return MessageBuilder.withPayload("Hello!").build();
            }
        };
        // もちろん return () -> MessageBuilder.withPayload("Hello!").build(); でもOK
    }

    public static void main(String[] args) {
        SpringApplication.run(DemoSourceApplication.class, args);
    }
}
```

`@InboundChannelAdapter`に送信先を、`@Poller`で送信間隔と一回のタイミングで何通`Message`を送るか(=`receive`メソッドを何回呼ぶか)を指定できます。

この`DemoSourceApplication`を実行すると、Sink側に次のようなログが出力され、定期的にメッセージを受信していることがわかります。

``` console
received GenericMessage [payload=Hello!, headers={kafka_offset=3, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=54, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=4, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=55, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=5, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=56, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=6, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=57, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=7, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=58, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=8, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=59, contentType=text/plain}]
received GenericMessage [payload=Hello!, headers={kafka_offset=9, kafka_messageKey=null, kafka_topic=demo, kafka_partitionId=0, kafka_nextOffset=60, contentType=text/plain}]
...
```

`@InboundChannelAdapter`はPOJOにもつけることができます。次の例のように、DIコンテナに管理されているコンポーネントのメソッドに`@InboundChannelAdapter`をつけると、そのメソッドが定期的に実行され、メッセージが送信されます。メソッドの返り値が`Message`以外のクラスである場合は、返り値のオブジェクトがメッセージの本文(Payload)に設定されます。

``` java
package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.stereotype.Component;

@SpringBootApplication
@EnableBinding(Source.class)
public class DemoSourceApplication {

    @Component
    public static class Greeter {
        @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedRate = "1000", maxMessagesPerPoll = "1"))
        public String greet() {
            return "Hello!";
        }
    }

    public static void main(String[] args) {
        SpringApplication.run(DemoSourceApplication.class, args);
    }
}
```

### Event Driven Microservices with Sprgin Cloud Stream

詳しくはJJUG CCC 2016 Fallで話しました。Spring Cloud Streamやるなら必見。

<iframe src="//www.slideshare.net/slideshow/embed_code/key/olk1yimpPCnvUI" width="595" height="485" frameborder="0" marginwidth="0" marginheight="0" scrolling="no" style="border:1px solid #CCC; border-width:1px; margin-bottom:5px; max-width: 100%;" allowfullscreen> </iframe> <div style="margin-bottom:5px"> <strong> <a href="//www.slideshare.net/makingx/event-driven-microservices-with-spring-cloud-stream-jjugccc-ccca3" title="Event Driven Microservices with Spring Cloud Stream #jjug_ccc #ccc_ab3" target="_blank">Event Driven Microservices with Spring Cloud Stream #jjug_ccc #ccc_ab3</a> </strong> from <strong><a target="_blank" href="//www.slideshare.net/makingx">Toshiaki Maki</a></strong> </div>


### Data Microservices with Spring Cloud Stream, Task, Data Flow

Spring Cloud Streamが生まれる背景となるSpring Cloud Data Flowに関してはSpring Day 2016で話しました。こちらも必見。

<iframe src="//www.slideshare.net/slideshow/embed_code/key/AABJoEYmRgNFxy" width="595" height="485" frameborder="0" marginwidth="0" marginheight="0" scrolling="no" style="border:1px solid #CCC; border-width:1px; margin-bottom:5px; max-width: 100%;" allowfullscreen> </iframe> <div style="margin-bottom:5px"> <strong> <a href="//www.slideshare.net/makingx/data-microservices-with-spring-cloud-stream-task-and-data-flow-jsug-springday" title="Data Microservices with Spring Cloud Stream, Task, and Data Flow #jsug #springday" target="_blank">Data Microservices with Spring Cloud Stream, Task, and Data Flow #jsug #springday</a> </strong> from <strong><a target="_blank" href="//www.slideshare.net/makingx">Toshiaki Maki</a></strong> </div>

### チュートリアル

書いた

https://github.com/Pivotal-Japan/spring-cloud-stream-tutorial/blob/master/README.md
