こんにちは、directのサーバーエンジニアをしている持田です。 会社ブログに初めて記事を書くので、非常に緊張しておりま…せん。
Spring Framework 5 がリリースされて少し経ち、僕も素振りに余年がありませんが、今回も素振りがてら Spring の Reactive websocket を使って簡単な websocket サーバーを書いてみました。 direct でも websocket を利用していますが、 Spring の WebFlux framework でどのように扱うのか非常に気になります!
Spring の WebFlux フレームワークについて
モバイル端末の普及などに伴って低速なネットワークによる大量アクセスなどが増えた結果、 従来のServlet APIを用いたブロッキングなIOでは低速なネットワークからのリクエストによってスレッドが専有されてしまい、CPUなどが有効に活用できません。 そこでノンブロッキングIOを活用してサーバーのスレッド/CPUを有効活用することが求められてきました。 Spring Framework 5 からは Netty 等のノンブロッキングなネットワークフレームワークの上に、 ノンブロッキングなバックプレッシャー制御のある非同期ストリーム処理フレームワークである Reactor をベースとした 新しいWebフレームワークとして WebFlux が追加されました。 WebFlux フレームワークは websocket API を提供しており、非同期な websocket サーバーを http サーバーと同じようなモデルで記述できます。
サンプルプロジェクトについて
今回はエコーサーバーでもいいかと思ったのですが、サンプルのエコーサーバーのコードを見ると、メッセージの取り回しが必要なさそうでした。 メッセージを取り出すコードも書きたかったので、今回は送られてきたメッセージに文字列Hello,をつけて返すサーバーにします。
サーバー
まずはサーバー側の実装を紹介します。プロジェクトの準備には Spring Initializer を使います。
curl https://start.spring.io/starter.tgz \ -d bootVersion=2.0.0.RC1 \ -d dependencies=webflux,lombok \ -d type=gradle-project \ -d baseDir=app | tar -xzvf -
解凍されたディレクトリーにある build.gradle には次のような依存ライブラリーが追加されていると思います。
dependencies {
compile('org.springframework.boot:spring-boot-starter-webflux')
compileOnly('org.projectlombok:lombok')
testCompile('org.springframework.boot:spring-boot-starter-test')
testCompile('io.projectreactor:reactor-test')
// eclipse-collections は僕が追加したライブラリーです
compile group: 'org.eclipse.collections', name: 'eclipse-collections', version: '9.1.0'
}
まずは websocket のハンドラークラスを作ります。
package com.example.app; import java.nio.charset.StandardCharsets; import java.time.Duration; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketSession; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Slf4j public class HellowebsocketHandler implements WebSocketHandler { @Override public Mono<Void> handle(final WebSocketSession session) { log.info("new connection: {}", session.getId()); final Flux<WebSocketMessage> result = session .receive() // (1) .delaySubscription(Duration.ofSeconds(2L)) // (2) .map(WebSocketMessage::getPayloadAsText) //(3) .flatMap(text -> this.handle(text, session.bufferFactory())); // (4) return session.send(result); // (5) } private Mono<WebSocketMessage> handle(final String input, final DataBufferFactory factory) { final String message = "Hello, " + input; log.info("coming: {}, going: {}", input, message); final DataBuffer buffer = factory.wrap(message.getBytes(StandardCharsets.UTF_8)); final WebSocketMessage webSocketMessage = new WebSocketMessage(WebSocketMessage.Type.TEXT, buffer); // (6) return Mono.just(webSocketMessage); } }
- websocket のメッセージが到着したらメッセージを取り出します (
Flux<WebSocketMessage>が返ってくる) - websocket らしく(?)メッセージを
2秒ほど遅らせて返します WebSocketMessage#getPayloadAsTextメソッドにより、Stringでメッセージを取り出しますWebSocketMessageにするためにDataBufferの形にする必要があるので、DataBufferFactoryをもらって、handle(String, DataBufferFactory)メソッドに渡しますWebSocketSession#sendメソッドによりメッセージを送信します(戻り値がMono<Void>)- レスポンスメッセージをテキストメッセージとして構築します
次にアプリケーションの main メソッドのあるクラスを組み立てます。
package com.example.app; import java.util.Map; import org.eclipse.collections.impl.factory.Maps; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.web.reactive.HandlerMapping; import org.springframework.web.reactive.config.EnableWebFlux; import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.server.WebSocketService; import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService; import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy; @SpringBootApplication @EnableWebFlux // (1) public class WebApp { public static void main(String[] args) { SpringApplication.run(WebApp.class, args); } @Bean WebSocketService webSocketService() { // (2) return new HandshakeWebSocketService(new ReactorNettyRequestUpgradeStrategy()); } @Bean WebSocketHandlerAdapter handlerAdapter() { // (3) return new WebSocketHandlerAdapter(webSocketService()); } @Bean HandlerMapping handlerMapping() { final Map<String, WebSocketHandler> handlerMappings = Maps.immutable .<String, WebSocketHandler>of("/ws/hello", new HelloWebSocketHandler()) // (4) .castToMap(); final SimpleUrlHandlerMapping urlHandlerMapping = new SimpleUrlHandlerMapping(); urlHandlerMapping.setUrlMap(handlerMappings); urlHandlerMapping.setOrder(-1); return urlHandlerMapping; } }
- WebFlux を利用するため、
@EnableWebFluxをつけます - http から websocket に切り替えるための
RequestUpgradeStrategyの実装を指定します WebSocketHandlerをDispatcherHandlerに接続します- URL
/ws/helloにきた websocket のリクエストをHelloWebSocketHandlerに割り振ります
これでサーバー側の実装は完了です
クライアント
クライアントですが、 WebFlux に付随しているクライアントを利用しようとしたのですが、Spring Webから切り離せそうになかったので、 普通に Tyrus を使いました。 Tyrus については他のサイト等にも掲載されていますので、そちらもあわせて参照して下さい。
build.gradle
plugins {
id 'java'
}
repositories {
mavenCentral()
}
dependencies {
compile group: 'org.glassfish.tyrus', name: 'tyrus-container-grizzly-client', version: '1.13.1'
compile group: 'org.glassfish.tyrus', name: 'tyrus-client', version: '1.13.1'
compile group: 'javax.websocket', name: 'javax.websocket-client-api', version: '1.1'
compile 'org.slf4j:slf4j-api:1.7.25'
compile 'ch.qos.logback:logback-classic:1.2.3'
compile group: 'io.projectreactor', name: 'reactor-core', version: '3.1.4.RELEASE'
}
クライアントのハンドラー
package com.example.tyrus; import javax.websocket.ClientEndpoint; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @ClientEndpoint public class TyrusClient { private static final Logger log = LoggerFactory.getLogger(TyrusClient.class); @OnOpen public void onOpen(final Session session) { log.info("open session to server: {}", session.getId()); } @OnMessage public void onMessage(final Session session, final String message) { log.info("message coming from server : {}", message); } }
単にメッセージが飛んできたらログをとるだけです。
クライアントのアプリのmain
package com.example.tyrus; import static com.example.tyrus.ExceptionalConsumer.consumer; import static com.example.tyrus.ExceptionalSupplier.supplier; import java.net.URI; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import javax.websocket.DeploymentException; import javax.websocket.Session; import org.glassfish.tyrus.client.ClientManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; public class TyrusApp { private static final Logger log = LoggerFactory.getLogger(TyrusClient.class); private static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor(); public static void main(String[] args) throws DeploymentException { final ClientManager clientManager = ClientManager.createClient(); final Future<Session> futureSession = clientManager.asyncConnectToServer( TyrusClient.class, URI.create("ws://localhost:8080/ws/hello")); log.info("start application"); Mono.fromFuture(CompletableFuture.supplyAsync(supplier(futureSession::get), EXECUTOR)) .delaySubscription(Duration.ofSeconds(2L)) // (1) .doOnNext(consumer(session -> session.getBasicRemote().sendText("tyrus"))) // (2) .delayElement(Duration.ofSeconds(4L)) // (3) .doOnNext(consumer(Session::close)) // (4) .doOnTerminate(() -> log.info("close connection")) .doOnTerminate(EXECUTOR::shutdown) .subscribe(); // (5) } }
無駄に reactor を使ってみました。
- 接続したら
2秒待つ - サーバーに
tyrusというメッセージを送る - イベントの間隔を
4秒にする - 切断する
- イベントの購読を開始する
これでクライアントの実装は終わりです
では早速サーバー、クライアントの順に起動してみます。
その時のログです。
サーバーのログ(抜粋)

クライアントのログ

接続してから4秒後にメッセージが飛んでいる様子がわかると思います。また、送信メッセージ tyrus message を受け取って Hello, tyrus message が返ってきているのがわかると思います。
今回のコードはこちらから入手できます。(幻となった WebFlux 版のクライアント実装もあります)
まとめ
僕がまだ reactor の API(Mono とか Flux とか)に不慣れだったり、結構 low level な API なこともあり、実装での型合わせ(Future からの Mono の生成や、 WebSocketSession の求めるオブジェクト(DataBuffer 、 String などを使えない)の生成など)には結構手こずりました。 ただ、ノンブロッキングなサーバーを利用できると、 direct のサーバーリソースを抑えられる見込みがありますので、今のうちに素振りして慣れておきたいものです。
社員を絶賛募集中
L is B では新しい技術が大好きなエンジニアを募集しています。われこそは!と思う方、思わない方、是非一緒に働きませんか? こちらのURL よりご応募ください。
おことわり
この記事は Spring boot 2.0.0.RC1 での実行結果であり、 Spring Boot 2.0.0 以後での動作を保証するものではありません。


