takeda_san’s blog

JavaFXと機械学習を頑張る方向。

JavaFXとNettyでソケット通信アプリケーションを作ってみよう その1

ちょっとNettyを使う用事があったので、忘れないうちにメモ。
せっかくなのでJavaFXでソケット通信アプリケーションを作ってみる。

今回使っているバージョンは、「4.1.12-Final」です。

まずは、JavaFXを使わないで、サンプルプログラムを動かして動作を確認してみる。
[サーバ] → [クライアント] のような通信をするプログラムです。

ここの「Writing a Time Server」と「Writing a Time Client」からコピペです。

Netty.docs: User guide for 4.x

クライアントの接続を待つ、サーバ側のプログラム

package application.socket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class Server {

    private int port;

    public Server(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ServerHandler());
                        }
                    }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture f = b.bind(port).sync();

            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new Server(port).run();
    }
}

つぎが、送信する任意の情報をバイト列に変換するサーバハンドラ。

package application.socket;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) {
        final ByteBuf time = ctx.alloc().buffer(4);
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

        final ChannelFuture f = ctx.writeAndFlush(time);
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

んで、つぎがサーバに接続しに行くクライアント。

package application.socket;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {

    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ClientHandler());
                }
            });

            ChannelFuture f = b.connect(host, port).sync();

            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

最後にサーバから送信されてきたバイト列を元の状態に変換する、クライアントハンドラ。

package application.socket;

import java.util.Date;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

んで、サーバ→クライアントの順番で起動する。
(クライアントの実行時引数にサーバ側の待ち受けホスト名:localhost、ポート:8080を指定するのをお忘れなく)

実行結果(クライアント側)

Wed Jul 05 22:24:06 JST 2017

うむ、ちゃんと動いた。

ハンドラって何さ

送信時、受信時のデータを変換するための、クラスっぽい。
パイプラインという処理の流れでデータを変換していて、その一つの処理の塊がハンドラらしい。
公式のこの図が素晴らしくわかりやすい。

ChannelPipeline (Netty API Reference (4.1.12.Final))

                                                 I/O Request
                                            via Channel or
                                        ChannelHandlerContext
                                                      |
  +---------------------------------------------------+---------------+
  |                           ChannelPipeline         |               |
  |                                                  \|/              |
  |    +---------------------+            +-----------+----------+    |
  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
  |    +----------+----------+            +-----------+----------+    |
  |              /|\                                  |               |
  |               |                                  \|/              |
  |    +----------+----------+            +-----------+----------+    |
  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
  |    +----------+----------+            +-----------+----------+    |
  |              /|\                                  .               |
  |               .                                   .               |
  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
  |        [ method call]                       [method call]         |
  |               .                                   .               |
  |               .                                  \|/              |
  |    +----------+----------+            +-----------+----------+    |
  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
  |    +----------+----------+            +-----------+----------+    |
  |              /|\                                  |               |
  |               |                                  \|/              |
  |    +----------+----------+            +-----------+----------+    |
  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
  |    +----------+----------+            +-----------+----------+    |
  |              /|\                                  |               |
  +---------------+-----------------------------------+---------------+
                  |                                  \|/
  +---------------+-----------------------------------+---------------+
  |               |                                   |               |
  |       [ Socket.read() ]                    [ Socket.write() ]     |
  |                                                                   |
  |  Netty Internal I/O Threads (Transport Implementation)            |
  +-------------------------------------------------------------------+
 

自分のアプリケーション→別のアプリケーションへの送信時のハンドラをアウトバウンド。
別のアプリケーション→自分のアプリケーションの受信時はインバウンド。

つまり、今回の例だと次のような流れでデータがやり取りされていたということだろう。

(時刻情報)→ [ServerHandler] →(バイト列)→ [ClientHandler] →(時刻情報)
       ↑ここで変換          ↑ここで復元

動作のシーケンスを教えなさいよ

  1. サーバ起動 8080ポートで接続を待つ
  2. クライアント起動 ホスト名:localhost ポート:8080のサーバに接続しに行く
  3. サーバ、接続時に動作する「channelActive」メソッドを実行
    3.1. 時刻情報を取得する
    3.2. 時刻情報をint型としてバイト列に変換する
    3.3. バイト列を送信する
  4. サーバ、送信終了後に接続を切断する
  5. クライアント、データ受信時に動作する「channelRead」メソッドを実行
    5.1. 受信したバイト列をint型として読み込む
    5.2. 元の時刻情報(long型)に復元する
    5.3. コンソールに時刻情報(Date型として)を表示する

ざっくりとこんな感じだろうか。
次回はちょっとコードに手を入れて、任意の文字数のメッセージをやり取りできるようにします。