Рубрики
Без рубрики

SE На Этой Неделе: Создайте сетевое приложение с помощью Netty и ProtoBuf

Что такое Netty и ProtoBuf Netty – асинхронный фрейм сетевого приложения, управляемый событиями… С тегами protobuf, netty, java.

Что такое Нетти и ПротоБуф

  • Netty — асинхронная платформа сетевых приложений, управляемая событиями. Основной целью Netty является создание высокопроизводительных серверов протоколов на основе NIO с разделением и слабой связью компонентов сетевой и бизнес-логики. Он может реализовывать широко известный протокол, такой как HTTP, или ваш собственный конкретный протокол.

  • ProtoBuf — Буферы протокола – это гибкий, эффективный, автоматизированный механизм для сериализации структурированных данных – подумайте о XML, но меньше, быстрее и проще. Вы определяете, как вы хотите, чтобы ваши данные были структурированы один раз, затем вы можете использовать специальный сгенерированный исходный код, чтобы легко записывать и считывать структурированные данные в различные потоки данных и из них, используя различные языки. Вы даже можете обновить структуру данных, не нарушая развернутые программы, скомпилированные в “старом” формате.

ПротоБуф

Во-первых, давайте определим наш Протобуф. Protobuf – это схема для связи между клиентом и сервером. Я определил демонстрационную схему , которая позволяет отправлять простые текстовые сообщения или файлы на сервер.

    syntax = "proto2";
    package org.demo.nettyprotobuf.proto;


    option optimize_for = SPEED;


    enum Type {
      MSG = 0;
      FILE = 1;
    }


    message DemoRequest {
      required Type type = 1;
      optional string requestMsg = 2;
      optional FileMsg file = 3;
    }


    message DemoResponse {
      optional uint32 code = 1;
      optional string responseMsg = 2;
    }


    message FileMsg{
      optional bytes fileBytes = 1;
      optional string filename = 2;
    }  

Сервер

  • Во-первых, нам нужно создать обработчик для обработки запросов от клиента. Если это текстовое сообщение, распечатайте его. Если это файл, сохраните его в папке /tmp . А затем ответьте клиенту.
public class DemoProtocolServerHandler extends SimpleChannelInboundHandler {
      private static final String FILE_DIR = "/tmp/";


      @Override
      protected void channelRead0(ChannelHandlerContext ctx, DemoMessages.DemoRequest msg) {
        if (msg.getType() == DemoMessages.Type.MSG) {
          DemoMessages.DemoResponse.Builder builder = DemoMessages.DemoResponse.newBuilder();
          String message = "Accepted from Server, returning response";
          System.out.println(message);
          builder.setResponseMsg(message)
                  .setCode(0);
          ctx.write(builder.build());
        } else if (msg.getType() == DemoMessages.Type.FILE) {


          byte[] bFile = msg.getFile().toByteArray();
          FileOutputStream fileOuputStream = null;
          try {
            fileOuputStream = new FileOutputStream(FILE_DIR + msg.getFile().getFilename());
            fileOuputStream.write(bFile);
          } catch (Exception e) {
            System.out.println(e);
          }finally {
            try {
              if (fileOuputStream != null) {
                fileOuputStream.close();
              }
            } catch (IOException e) {
              System.out.println(e);
            }
          }
          DemoMessages.DemoResponse.Builder builder = DemoMessages.DemoResponse.newBuilder();
          String message = "File saved to: " + FILE_DIR;
          System.out.println(message);
          builder.setResponseMsg(message)
                  .setCode(0);
          ctx.write(builder.build());
        } else {
          DemoMessages.DemoResponse.Builder builder = DemoMessages.DemoResponse.newBuilder();
          String message = "Unsupported message type " + msg.getType();
          System.out.println(message);
          builder.setResponseMsg(message)
                  .setCode(1);
          ctx.write(builder.build());
        }
      }

      @Override
      public void channelReadComplete(ChannelHandlerContext ctx) {
          ctx.flush();
      }


      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
          cause.printStackTrace();
          ctx.close();
      }


    }
  • Давайте создадим канал для обработки входящих запросов. При получении запроса нам нужно его декодировать, кодировать и обрабатывать запрос. И затем трубопровод канала, обратите внимание, что здесь важен порядок. (Вопрос: почему такой порядок? Пожалуйста, взгляните на мои комментарии в коде).
public class DemoServerChannelInitializer extends ChannelInitializer {


      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
        // Why this order, decoder, encoder, handler?
        // Note: Every IO operation on a Channel in Netty is non-blocking.
        // This means that every operation is returned immediately after the call. 
        // When recieved message uses decoder, and then response needs encoder,
        // then handling the requests uses handler.
        ChannelPipeline p = ch.pipeline();
        p.addLast(new ProtobufVarint32FrameDecoder());
        p.addLast(new ProtobufDecoder(DemoMessages.DemoRequest.getDefaultInstance()));


        p.addLast(new ProtobufVarint32LengthFieldPrepender());
        p.addLast(new ProtobufEncoder());


        p.addLast(new DemoProtocolServerHandler());
      }


    }
  • Создайте сервер . Давайте по умолчанию привяжем сервер к порту 8080.
public class DemoServer {

      static final int PORT;

      static {
        if (System.getenv("port") == null){
          PORT = 8080;
        } else {
          PORT = Integer.parseInt(System.getenv("port"));
        }
      }


      public static void main(String[] args) throws InterruptedException {

        // Create event loop groups. One for incoming connections handling and 
        // second for handling actual event by workers
        EventLoopGroup serverGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
          ServerBootstrap bootStrap = new ServerBootstrap();
          bootStrap.group(serverGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new DemoServerChannelInitializer());
          // Bind to port 
          bootStrap.bind(PORT).sync().channel().closeFuture().sync();
        } finally {
          serverGroup.shutdownGracefully();
          workerGroup.shutdownGracefully();
        }
      }
    }

Тестовый клиент

Теперь нам нужно создать тестовый клиент для проверки связи между сервером и клиентом.

  • Давайте сначала создадим обработчик клиента обработчик. Обработчик может отправить файл или сообщение на сервер, что зависит от Демонстрационных сообщений. Тип .
public class DemoMsgClientHandler extends SimpleChannelInboundHandler {


      private Channel channel;
      private DemoMessages.DemoResponse resp;
      private final BlockingQueue resps = new LinkedBlockingQueue();
      public DemoMessages.DemoResponse sendRequest(DemoMessages.Type type) {


          DemoMessages.DemoRequest req = null;
          // send File request
          if (DemoMessages.Type.FILE == type) {
              InputStream inputStream = null;
              try {
                  inputStream = getClass().getResourceAsStream("/components.png");


                  DemoMessages.FileMsg fileMsg = DemoMessages.FileMsg.newBuilder()
                          .setFileBytes(ByteString.readFrom(inputStream))
                          .setFilename("components.png")
                          .build();
                  req = DemoMessages.DemoRequest.newBuilder()
                          .setType(DemoMessages.Type.FILE)
                          .setFile(fileMsg)
                          .build();
                  // Send request
                  channel.writeAndFlush(req);
              } catch (Exception e) {
                  e.printStackTrace();
              } finally {
                  try {
                      if (inputStream != null) {
                          inputStream.close();
                      }
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
              }
          } else {
              // send message request.
               req = DemoMessages.DemoRequest.newBuilder()
                      .setType(DemoMessages.Type.MSG)
                      .setRequestMsg("From Client").build();
          }
        // Send request
        channel.writeAndFlush(req);

        // Now wait for response from server
        boolean interrupted = false;
        for (;;) {
            try {
                resp = resps.take();
                break;
            } catch (InterruptedException ignore) {
                interrupted = true;
            }
        }


        if (interrupted) {
            Thread.currentThread().interrupt();
        }

        return resp;
      }


      @Override
      public void channelRegistered(ChannelHandlerContext ctx) {
          channel = ctx.channel();
      }

      @Override
      protected void channelRead0(ChannelHandlerContext ctx, DemoMessages.DemoResponse msg)
          throws Exception {
        resps.add(msg);
      }

      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
          cause.printStackTrace();
          ctx.close();
      }
    }
  • Давайте создадим канал для обработки запросов на отправку.
public class DemoClientInitializer  extends ChannelInitializer {


      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline p = ch.pipeline();

        p.addLast(new ProtobufVarint32FrameDecoder());
        p.addLast(new ProtobufDecoder(DemoMessages.DemoResponse.getDefaultInstance()));


        p.addLast(new ProtobufVarint32LengthFieldPrepender());
        p.addLast(new ProtobufEncoder());


        p.addLast(new DemoMsgClientHandler());
      }

    }
  • Теперь давайте создадим [клиента] для отправки запросов. Этот клиент отправит на сервер 10 текстовых сообщений и 1 файл.
public class DemoClient {

      static final String HOST;
      static final int PORT;


      static {
        if (System.getenv("port") == null){
          PORT = 8080;
        } else {
          PORT = Integer.parseInt(System.getenv("port"));
        }


        if (System.getenv("host") == null){
          HOST = "127.0.0.1";
        } else {
          HOST =  System.getenv("host");
        }
      }


      public static void main(String[] args) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();


        try {
          Bootstrap bootstrap = new Bootstrap();
          bootstrap.group(group)
                   .channel(NioSocketChannel.class)
                   .handler(new DemoClientInitializer());

          // Create connection 
          Channel c = bootstrap.connect(HOST, PORT).sync().channel();
          DemoMsgClientHandler handle = c.pipeline().get(DemoMsgClientHandler.class);


          int i = 0;
          while (i++ < 10) {
            DemoMessages.DemoResponse resp = handle.sendRequest(DemoMessages.Type.MSG);
            System.out.println("Got response msg from Server: " + resp.getResponseMsg());
            Thread.sleep(1000);
          }


          DemoMessages.DemoResponse resp = handle.sendRequest(DemoMessages.Type.FILE);
          System.out.println("Got response msg from Server: " + resp.getResponseMsg());


          c.close();




        } finally {
          group.shutdownGracefully();
        }

      }
    }

Тест

Теперь пришло время запустить сервер и клиент.

Я загрузил исходный код на GitHub . Есть инструкции о том, как это проверить. Пожалуйста, взгляните на readme. Есть два способа проверить это. Вы можете использовать команду java для запуска сервера и клиента. Или, есть простой способ, я создал докер-композицию. Вы можете запустить их одной командой.

How to build
`mvn clean install`

How to run.

First Option (use docker compose):

start:

`docker-compose up`

build docker image:

`docker-compose build`

stop:

`docker-compose down`

Second Option (Manually):

Start server:

   `java -jar server/target/server-1.0-SNAPSHOT.jar`

Что больше

Нетти – действительно мощный фреймворк. Вы можете использовать его для создания собственного веб-сервера Http. Вы можете задаться вопросом, почему вы хотите создать свой собственный Http-сервер. Http-сервер общего назначения иногда не очень хорошо масштабируется или не очень хорошо работает с вашими случаями. Например, вам может потребоваться реализовать HTTP-сервер, оптимизированный для приложений чата на основе AJAX, потоковой передачи мультимедиа или передачи больших файлов. В исходном коде Netty есть много примеров. Проверить это здесь .

Ссылка:

https://github.com/jiayanguo/nettyprotobufapp https://netty.io/ https://netty.io/wiki/user-guide-for-4.x.html https://www.baeldung.com/netty https://dzone.com/articles/build-a-simple-netty-application-with-and-without https://www.baeldung.com/netty

Оригинал: “https://dev.to/jiayanguo/se-this-week-build-a-network-app-using-netty-and-protobuf-2dk2”