こんにちは、directのサーバーエンジニアをしている持田です。 会社ブログに初めて記事を書くので、非常に緊張しておりま…せん。
Spring Framework 5 がリリースされて少し経ち、僕も素振りに余年がありませんが、今回も素振りがてら Spring の Reactive websocket を使って簡単な websocket サーバーを書いてみました。 direct でも websocket を利用していますが、 Spring の WebFlux framework でどのように扱うのか非常に気になります!
Spring の WebFlux フレームワークについて
モバイル端末の普及などに伴って低速なネットワークによる大量アクセスなどが増えた結果、 従来のServlet APIを用いたブロッキングなIOでは低速なネットワークからのリクエストによってスレッドが専有されてしまい、CPUなどが有効に活用できません。 そこでノンブロッキングIOを活用してサーバーのスレッド/CPUを有効活用することが求められてきました。 Spring Framework 5 からは Netty 等のノンブロッキングなネットワークフレームワークの上に、 ノンブロッキングなバックプレッシャー制御のある非同期ストリーム処理フレームワークである Reactor をベースとした 新しいWebフレームワークとして WebFlux が追加されました。 WebFlux フレームワークは websocket API を提供しており、非同期な websocket サーバーを http サーバーと同じようなモデルで記述できます。
サンプルプロジェクトについて
今回はエコーサーバーでもいいかと思ったのですが、サンプルのエコーサーバーのコードを見ると、メッセージの取り回しが必要なさそうでした。 メッセージを取り出すコードも書きたかったので、今回は送られてきたメッセージに文字列Hello,
をつけて返すサーバーにします。
サーバー
まずはサーバー側の実装を紹介します。プロジェクトの準備には Spring Initializer を使います。
curl https://start.spring.io/starter.tgz \ -d bootVersion=2.0.0.RC1 \ -d dependencies=webflux,lombok \ -d type=gradle-project \ -d baseDir=app | tar -xzvf -
解凍されたディレクトリーにある build.gradle
には次のような依存ライブラリーが追加されていると思います。
dependencies { compile('org.springframework.boot:spring-boot-starter-webflux') compileOnly('org.projectlombok:lombok') testCompile('org.springframework.boot:spring-boot-starter-test') testCompile('io.projectreactor:reactor-test') // eclipse-collections は僕が追加したライブラリーです compile group: 'org.eclipse.collections', name: 'eclipse-collections', version: '9.1.0' }
まずは websocket のハンドラークラスを作ります。
package com.example.app; import java.nio.charset.StandardCharsets; import java.time.Duration; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketSession; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @Slf4j public class HellowebsocketHandler implements WebSocketHandler { @Override public Mono<Void> handle(final WebSocketSession session) { log.info("new connection: {}", session.getId()); final Flux<WebSocketMessage> result = session .receive() // (1) .delaySubscription(Duration.ofSeconds(2L)) // (2) .map(WebSocketMessage::getPayloadAsText) //(3) .flatMap(text -> this.handle(text, session.bufferFactory())); // (4) return session.send(result); // (5) } private Mono<WebSocketMessage> handle(final String input, final DataBufferFactory factory) { final String message = "Hello, " + input; log.info("coming: {}, going: {}", input, message); final DataBuffer buffer = factory.wrap(message.getBytes(StandardCharsets.UTF_8)); final WebSocketMessage webSocketMessage = new WebSocketMessage(WebSocketMessage.Type.TEXT, buffer); // (6) return Mono.just(webSocketMessage); } }
- websocket のメッセージが到着したらメッセージを取り出します (
Flux<WebSocketMessage>
が返ってくる) - websocket らしく(?)メッセージを
2
秒ほど遅らせて返します WebSocketMessage#getPayloadAsText
メソッドにより、String
でメッセージを取り出しますWebSocketMessage
にするためにDataBuffer
の形にする必要があるので、DataBufferFactory
をもらって、handle(String, DataBufferFactory)
メソッドに渡しますWebSocketSession#send
メソッドによりメッセージを送信します(戻り値がMono<Void>
)- レスポンスメッセージをテキストメッセージとして構築します
次にアプリケーションの main
メソッドのあるクラスを組み立てます。
package com.example.app; import java.util.Map; import org.eclipse.collections.impl.factory.Maps; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.web.reactive.HandlerMapping; import org.springframework.web.reactive.config.EnableWebFlux; import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.server.WebSocketService; import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService; import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy; @SpringBootApplication @EnableWebFlux // (1) public class WebApp { public static void main(String[] args) { SpringApplication.run(WebApp.class, args); } @Bean WebSocketService webSocketService() { // (2) return new HandshakeWebSocketService(new ReactorNettyRequestUpgradeStrategy()); } @Bean WebSocketHandlerAdapter handlerAdapter() { // (3) return new WebSocketHandlerAdapter(webSocketService()); } @Bean HandlerMapping handlerMapping() { final Map<String, WebSocketHandler> handlerMappings = Maps.immutable .<String, WebSocketHandler>of("/ws/hello", new HelloWebSocketHandler()) // (4) .castToMap(); final SimpleUrlHandlerMapping urlHandlerMapping = new SimpleUrlHandlerMapping(); urlHandlerMapping.setUrlMap(handlerMappings); urlHandlerMapping.setOrder(-1); return urlHandlerMapping; } }
- WebFlux を利用するため、
@EnableWebFlux
をつけます - http から websocket に切り替えるための
RequestUpgradeStrategy
の実装を指定します WebSocketHandler
をDispatcherHandler
に接続します- URL
/ws/hello
にきた websocket のリクエストをHelloWebSocketHandler
に割り振ります
これでサーバー側の実装は完了です
クライアント
クライアントですが、 WebFlux に付随しているクライアントを利用しようとしたのですが、Spring Webから切り離せそうになかったので、 普通に Tyrus を使いました。 Tyrus については他のサイト等にも掲載されていますので、そちらもあわせて参照して下さい。
build.gradle
plugins { id 'java' } repositories { mavenCentral() } dependencies { compile group: 'org.glassfish.tyrus', name: 'tyrus-container-grizzly-client', version: '1.13.1' compile group: 'org.glassfish.tyrus', name: 'tyrus-client', version: '1.13.1' compile group: 'javax.websocket', name: 'javax.websocket-client-api', version: '1.1' compile 'org.slf4j:slf4j-api:1.7.25' compile 'ch.qos.logback:logback-classic:1.2.3' compile group: 'io.projectreactor', name: 'reactor-core', version: '3.1.4.RELEASE' }
クライアントのハンドラー
package com.example.tyrus; import javax.websocket.ClientEndpoint; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @ClientEndpoint public class TyrusClient { private static final Logger log = LoggerFactory.getLogger(TyrusClient.class); @OnOpen public void onOpen(final Session session) { log.info("open session to server: {}", session.getId()); } @OnMessage public void onMessage(final Session session, final String message) { log.info("message coming from server : {}", message); } }
単にメッセージが飛んできたらログをとるだけです。
クライアントのアプリのmain
package com.example.tyrus; import static com.example.tyrus.ExceptionalConsumer.consumer; import static com.example.tyrus.ExceptionalSupplier.supplier; import java.net.URI; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import javax.websocket.DeploymentException; import javax.websocket.Session; import org.glassfish.tyrus.client.ClientManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; public class TyrusApp { private static final Logger log = LoggerFactory.getLogger(TyrusClient.class); private static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor(); public static void main(String[] args) throws DeploymentException { final ClientManager clientManager = ClientManager.createClient(); final Future<Session> futureSession = clientManager.asyncConnectToServer( TyrusClient.class, URI.create("ws://localhost:8080/ws/hello")); log.info("start application"); Mono.fromFuture(CompletableFuture.supplyAsync(supplier(futureSession::get), EXECUTOR)) .delaySubscription(Duration.ofSeconds(2L)) // (1) .doOnNext(consumer(session -> session.getBasicRemote().sendText("tyrus"))) // (2) .delayElement(Duration.ofSeconds(4L)) // (3) .doOnNext(consumer(Session::close)) // (4) .doOnTerminate(() -> log.info("close connection")) .doOnTerminate(EXECUTOR::shutdown) .subscribe(); // (5) } }
無駄に reactor を使ってみました。
- 接続したら
2
秒待つ - サーバーに
tyrus
というメッセージを送る - イベントの間隔を
4
秒にする - 切断する
- イベントの購読を開始する
これでクライアントの実装は終わりです
では早速サーバー、クライアントの順に起動してみます。
その時のログです。
サーバーのログ(抜粋)
クライアントのログ
接続してから4秒後にメッセージが飛んでいる様子がわかると思います。また、送信メッセージ tyrus message
を受け取って Hello, tyrus message
が返ってきているのがわかると思います。
今回のコードはこちらから入手できます。(幻となった WebFlux 版のクライアント実装もあります)
まとめ
僕がまだ reactor の API(Mono
とか Flux
とか)に不慣れだったり、結構 low level な API なこともあり、実装での型合わせ(Future
からの Mono
の生成や、 WebSocketSession
の求めるオブジェクト(DataBuffer
、 String
などを使えない)の生成など)には結構手こずりました。 ただ、ノンブロッキングなサーバーを利用できると、 direct のサーバーリソースを抑えられる見込みがありますので、今のうちに素振りして慣れておきたいものです。
社員を絶賛募集中
L is B では新しい技術が大好きなエンジニアを募集しています。われこそは!と思う方、思わない方、是非一緒に働きませんか? こちらのURL よりご応募ください。
おことわり
この記事は Spring boot 2.0.0.RC1 での実行結果であり、 Spring Boot 2.0.0 以後での動作を保証するものではありません。