---
title: Upstashの Serverless KafkaにSpring Bootでアクセスするメモ
tags: ["Spring Boot", "Kafka", "Upstash", "Spring for Apache Kafka"]
categories: ["Programming", "Java", "org", "springframework", "kafka"]
date: 2024-07-01T02:58:12Z
updated: 2024-07-01T03:02:43Z
---

[Upstash](https://upstash.com/)の[Serverless Kafka](https://upstash.com/docs/kafka/overall/getstarted)にSpring Bootからアクセスしてみます。

### Upstashアカウントの作成


https://upstash.com/kafka から"Free"の"Start Now"をクリック

<img width="1912" alt="image" src="https://github.com/making/blog.ik.am/assets/106908/2d4c7e0b-f34b-4375-801a-688816bb2de7">

Google(等)でログイン

<img width="1912" alt="image" src="https://github.com/making/blog.ik.am/assets/106908/c2b51eac-46fc-485a-83ec-16f3cdaf596f">

これだけでアカウント作成完了。

### Kafkaクラスタの作成

ダッシュボードから"Kafka"を選択

<img width="1912" alt="image" src="https://github.com/making/blog.ik.am/assets/106908/8eb430d0-e097-4e84-b93f-7af7bd17e3a1">

”Create Cluster"ボタンをクリックし、名前などを入力

<img width="1912" alt="image" src="https://github.com/making/blog.ik.am/assets/106908/8516329b-0d33-481f-8683-60b5a6e3ca45">

Topic名を入力して、"Create Topic"ボタンをクリック

<img width="1912" alt="image" src="https://github.com/making/blog.ik.am/assets/106908/823cecd6-5187-4c8a-92ff-dceb1aac5d58">

これでKafkaの準備ができました。


### Producerアプリの作成

Spring Initializrでアプリの雛形を作成します。

```
curl https://start.spring.io/starter.tgz \
       -d artifactId=demo-kafka-producer \
       -d baseDir=demo-kafka-producer \
       -d packageName=com.example \
       -d dependencies=kafka,testcontainers,web,actuator \
       -d type=maven-project \
       -d applicationName=DemoKafkaProducerApplication | tar -xzvf -
cd demo-kafka-producer
```

Producer用のサンプルコードを作成します。

```java
cat <<'EOF' > src/main/java/com/example/ProducerController.java
package com.example;

import java.util.concurrent.CompletableFuture;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {
	private final KafkaTemplate<String, String> kafkaTemplate;

	private final String topic;

	public ProducerController(KafkaTemplate<String, String> kafkaTemplate, @Value("${demo.topic}") String topic) {
		this.kafkaTemplate = kafkaTemplate;
		this.topic = topic;
	}

	@PostMapping(path = "/")
	public CompletableFuture<String> hello(@RequestBody String message) {
		CompletableFuture<SendResult<String, String>> result = this.kafkaTemplate.send(this.topic, message);
		return result.thenApply(r -> r.getProducerRecord().toString());
	}
}
EOF
```

Kafka環境に依らないプロパティを`application.properties`に定義します。

```properties
cat <<'EOF' > src/main/resources/application.properties
demo.topic=demo
server.shutdown=graceful
spring.application.name=demo-kafka-producer
EOF
```

Upstash環境に依存するプロパティを`application-upstash.properties`に定義します。

接続するための情報はダッシュボードから確認できます。

<img width="1912" alt="image" src="https://github.com/making/blog.ik.am/assets/106908/1aec5747-66ee-438d-b97d-f0d1f5078ade">

以下の

* `spring.kafka.bootstrap-servers`
* `spring.kafka.jaas.options.password`
* `spring.kafka.jaas.options.username`

を変更してください。

```properties
cat <<'EOF' > src/main/resources/application-upstash.properties
spring.kafka.admin.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.admin.security.protocol=SASL_SSL
spring.kafka.bootstrap-servers=XXXX-us1-kafka.upstash.io:9092
spring.kafka.jaas.control-flag=required
spring.kafka.jaas.enabled=true
spring.kafka.jaas.login-module=org.apache.kafka.common.security.scram.ScramLoginModule
spring.kafka.jaas.options.password=XXXX
spring.kafka.jaas.options.username=XXXX
spring.kafka.producer.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.producer.security.protocol=SASL_SSL
EOF
```

アプリをビルドし、実行します。profileを`upstash`に設定して、Upstashに接続できるようにします。

```
./mvnw clean package -DskipTests
java -jar target/demo-kafka-producer-0.0.1-SNAPSHOT.jar --spring.profiles.active=upstash
```


### Consumerアプリの作成

Spring Initializrでアプリの雛形を作成します。

```
curl https://start.spring.io/starter.tgz \
       -d artifactId=demo-kafka-consumer \
       -d baseDir=demo-kafka-consumer \
       -d packageName=com.example \
       -d dependencies=kafka,testcontainers,web,actuator \
       -d type=maven-project \
       -d applicationName=DemoKafkaConsumerApplication | tar -xzvf -
cd demo-kafka-consumer
```

Consumer用のサンプルコードを作成します。

```java
cat <<'EOF' > src/main/java/com/example/ConsumerController.java
package com.example;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ConsumerController {
	private final List<String> messages = new CopyOnWriteArrayList<>();

	private final Logger log = LoggerFactory.getLogger(ConsumerController.class);

	@GetMapping(path = "")
	public List<String> getMessages() {
		return this.messages;
	}

	@KafkaListener(topics = "${demo.topic}")
	public void onMessage(String message) {
		log.info("onMessage({})", message);
		this.messages.add(message);
	}
}
EOF
```

Kafka環境に依らないプロパティを`application.properties`に定義します。

```properties
cat <<'EOF' > src/main/resources/application.properties
demo.topic=demo
server.port=8082
server.shutdown=graceful
spring.application.name=demo-kafka-consumer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=${spring.application.name}
EOF
```

Upstash環境に依存するプロパティを`application-upstash.properties`に定義します。

以下の

* `spring.kafka.bootstrap-servers`
* `spring.kafka.jaas.options.password`
* `spring.kafka.jaas.options.username`

を変更してください。

```properties
cat <<'EOF' > src/main/resources/application-upstash.properties
spring.kafka.admin.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.admin.security.protocol=SASL_SSL
spring.kafka.bootstrap-servers=XXXX-us1-kafka.upstash.io:9092
spring.kafka.consumer.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.consumer.security.protocol=SASL_SSL
spring.kafka.jaas.control-flag=required
spring.kafka.jaas.enabled=true
spring.kafka.jaas.login-module=org.apache.kafka.common.security.scram.ScramLoginModule
spring.kafka.jaas.options.password=XXXX
spring.kafka.jaas.options.username=XXXX
EOF
```

アプリをビルドし、実行します。profileを`upstash`に設定して、Upstashに接続できるようにします。

```
./mvnw clean package -DskipTests
java -jar target/demo-kafka-consumer-0.0.1-SNAPSHOT.jar --spring.profiles.active=upstash
```

### 動作確認


Producerアプリにリクエストを送ります。

```
curl localhost:8080 -H "Content-Type: text/plain" -d "Hello World"
curl localhost:8080 -H "Content-Type: text/plain" -d "Test"   
```

Consumer側に次のようなログが出力されればOKです。

```
2024-06-30T02:50:22.952+09:00  INFO 75165 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController           : onMessage(Hello World)
2024-06-30T02:50:29.202+09:00  INFO 75165 --- [demo-kafka-consumer] [ntainer#0-0-C-1] com.example.ConsumerController           : onMessage(Test)
```

また、送ったメッセージはConsumerアプリののAPIから取得できます。

```
$ curl localhost:8082
["Hello World","Test"]
```
