Skip to content

Mysql protocol

Sayapin Alexander edited this page Oct 21, 2016 · 34 revisions

Целью данного упражнения является разработка модуля поддержки протокола Mysql в академических целях. Необходимо разобраться с Netty, реализовав подмножество протокола Mysql достаточное для подключения сторонними клиентами и выполнения простых команд.

Исходные данные

Замечания по разработке

  • Создаём очень ограниченное подмножество протокола Mysql
  • Не заморачиваемся с SSL
  • Обработка запросов передаётся дальше пользовательскому обработчику
  • Не делаем поддержку сжатия
  • Не делаем поддержку транзакций
  • Не делаем поддержку статуса сервера
  • Используем протокол версии 4.1
  • Не реализуем настраиваемость протокола (например, возврат COM_EOF вместо COM_OK)
  • Не реализуем prepared statements
  • Не реализуем "честную" аутентификацию

Пара слов о протоколе

Протокол mysql представляет собой бинарный клиент-серверный протокол. В качестве модели представления байт используется little endian (то есть, байты передаются перевёрнутыми). Протокол сессионный, то есть перед работой необходимо пройти аутентификацию, и это является фактом начала сессии. В конце работы необходимо отправить пакет COM_QUIT для завершения сеанса. Протокол синхронный, то есть после запроса необходимо дождаться ответа от сервера.

В работе протокола выделяют 2 фазы:

Пакет состоит из 2-х частей ( https://dev.mysql.com/doc/internals/en/mysql-packet.html ):

  • Заголовка
  • Тела пакета

Заголовок состоит из 2-х полей:

  • Длины тела пакета (3 байта)
  • Номер пакета в группе (1 байт)

пример пакета COM_QUIT.

 01 00 00 00 01
  • 010000 - длина тела пакета (3 байта)
  • 00 - номер пакета
  • 01 - тело пакета

Цель №1: Создать простейший сервер на Netty

Для начала необходимо реализовать простой сервер, который просто принимает подключения и выводит информацию о подключении в лог. Просто возьмём простейший сервер из документации Netty ( http://netty.io/wiki/user-guide-for-4.x.html ).

Основной класс. App.java

public class App {

    public static final Logger logger = LoggerFactory.getLogger(App.class);

    public static int port = 1234;

    public static void main(String[] args) {
        logger.info("Application start");

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        logger.info("Client connected from:{}", ch.remoteAddress()); // Вывод информации о клиента

                        ByteBuf greetingBuffer = ch.alloc().buffer();

                        greetingBuffer.writeBytes("Greeting".getBytes()); // Выводим Greeting при подключении клиента

                        ch.writeAndFlush(greetingBuffer);

                        ch.pipeline().addLast(new DiscardServerHandler());
                    }
                })
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture f = b.bind(port).sync();

            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            logger.error("Error on create bootstrap", e);
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }

        logger.info("Application shutted down");
    }
}

Код DiscardServerHandler.

public class DiscardServerHandler extends ChannelInboundHandlerAdapter {

    public static final Logger logger = LoggerFactory.getLogger(DiscardServerHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        logger.debug("channel read");

        ByteBuf byteBuf = (ByteBuf)msg;

        if (logger.isTraceEnabled()) {
            logger.trace("\n{}", ByteBufUtil.prettyHexDump(byteBuf));
        }

        ByteBuf buffer = ctx.alloc().buffer();

        buffer.writeBytes("Hello\n".getBytes());

        ctx.writeAndFlush(buffer);
        
        ((ByteBuf) msg).release();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        logger.debug("Exception", cause);
        
        cause.printStackTrace();
        ctx.close();
    }
}

Подключимся с помощью telnet и отправим пару строк.

$ telnet 127.0.0.1 1234
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
Greetings adsadad
Hello

Hello

Hello

Hello

Hello

Hello
Connection closed by foreign host.

Логи приложения.

2016-03-24 19:54:38 nioEventLoopGroup-3-2 INFO com.lrn.nettymysqlprotocol.App Client connected from:/127.0.0.1:53562
2016-03-24 19:54:41 nioEventLoopGroup-3-2 DEBUG com.lrn.nettymysqlprotocol.DiscardServerHandler channel read
2016-03-24 19:54:41 nioEventLoopGroup-3-2 TRACE com.lrn.nettymysqlprotocol.DiscardServerHandler 
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 73 61 64 73 61 64 61 64 0d 0a                   |sadsadad..      |
+--------+-------------------------------------------------+----------------+
2016-03-24 19:54:42 nioEventLoopGroup-3-2 DEBUG com.lrn.nettymysqlprotocol.DiscardServerHandler channel read
2016-03-24 19:54:42 nioEventLoopGroup-3-2 TRACE com.lrn.nettymysqlprotocol.DiscardServerHandler 
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 0d 0a                                           |..              |
+--------+-------------------------------------------------+----------------+
2016-03-24 19:54:43 nioEventLoopGroup-3-2 DEBUG com.lrn.nettymysqlprotocol.DiscardServerHandler channel read
2016-03-24 19:54:43 nioEventLoopGroup-3-2 TRACE com.lrn.nettymysqlprotocol.DiscardServerHandler 
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 0d 0a                                           |..              |
+--------+-------------------------------------------------+----------------+
2016-03-24 19:54:43 nioEventLoopGroup-3-2 DEBUG com.lrn.nettymysqlprotocol.DiscardServerHandler channel read
2016-03-24 19:54:43 nioEventLoopGroup-3-2 TRACE com.lrn.nettymysqlprotocol.DiscardServerHandler 
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 0d 0a                                           |..              |
+--------+-------------------------------------------------+----------------+
2016-03-24 19:54:43 nioEventLoopGroup-3-2 DEBUG com.lrn.nettymysqlprotocol.DiscardServerHandler channel read
2016-03-24 19:54:43 nioEventLoopGroup-3-2 TRACE com.lrn.nettymysqlprotocol.DiscardServerHandler 
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 0d 0a                                           |..              |
+--------+-------------------------------------------------+----------------+
2016-03-24 19:54:43 nioEventLoopGroup-3-2 DEBUG com.lrn.nettymysqlprotocol.DiscardServerHandler channel read
2016-03-24 19:54:43 nioEventLoopGroup-3-2 TRACE com.lrn.nettymysqlprotocol.DiscardServerHandler 
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 0d 0a                                           |..              |
+--------+-------------------------------------------------+----------------+

Пример сервера в ветке pres_0_14_0

Цель №2: Создать сервер, который возвращает greeting записанный с реального сервера

Для начала рассмотрим описание протокола MySQL из документации. https://dev.mysql.com/doc/internals/en/connection-phase.html

Когда клиент подключается к серверу, сервер отправляет initial handshake (или greetings) пакет. В ответ клиент отправляет запрос логина, а сервер возвращает результат аутентификации пользователя.

На текущем этапе детали протокола Mysql не интересуют. Наша задача записать initial handshake с реального сервера и отправить его клиенту в ответ на подключение. Воспользовавшись wireshark был записан обмен пакетами между сервером и клиентом. (src/main/resources/mysqlproto.pcap).

Initial handshake от сервера

0000   5b 00 00 00 0a 35 2e 36 2e 32 38 2d 30 75 62 75  [....5.6.28-0ubu
0010   6e 74 75 30 2e 31 35 2e 30 34 2e 31 00 0b 00 00  ntu0.15.04.1....
0020   00 65 27 31 5e 68 6e 71 6b 00 ff f7 08 02 00 7f  .e'1^hnqk.......
0030   80 15 00 00 00 00 00 00 00 00 00 00 48 63 3c 40  ............Hc<@
0040   6a 78 3d 63 5d 29 51 3e 00 6d 79 73 71 6c 5f 6e  jx=c])Q>.mysql_n
0050   61 74 69 76 65 5f 70 61 73 73 77 6f 72 64 00     ative_password.

Клиент отправляет пакет с запросом аутентификации

0000   ae 00 00 01 05 a6 7f 00 00 00 00 01 21 00 00 00  ............!...
0010   00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00  ................
0020   00 00 00 00 72 6f 6f 74 00 00 6d 79 73 71 6c 5f  ....root..mysql_
0030   6e 61 74 69 76 65 5f 70 61 73 73 77 6f 72 64 00  native_password.
0040   71 03 5f 6f 73 10 64 65 62 69 61 6e 2d 6c 69 6e  q._os.debian-lin
0050   75 78 2d 67 6e 75 0c 5f 63 6c 69 65 6e 74 5f 6e  ux-gnu._client_n
0060   61 6d 65 08 6c 69 62 6d 79 73 71 6c 04 5f 70 69  ame.libmysql._pi
0070   64 05 32 37 38 31 38 0f 5f 63 6c 69 65 6e 74 5f  d.27818._client_
0080   76 65 72 73 69 6f 6e 06 35 2e 36 2e 32 38 09 5f  version.5.6.28._
0090   70 6c 61 74 66 6f 72 6d 06 78 38 36 5f 36 34 0c  platform.x86_64.
00a0   70 72 6f 67 72 61 6d 5f 6e 61 6d 65 05 6d 79 73  program_name.mys
00b0   71 6c                                            ql

Сервер проводит аутентификацию и возвращает успешный ответ.

0000   00 00 03 04 00 06 00 00 00 00 00 00 00 00 08 00  ................
0010   45 08 00 3f 44 40 40 00 40 06 f8 6e 7f 00 00 01  E..?D@@.@..n....
0020   7f 00 00 01 0c ea ee 0d 70 ce 57 ae 11 66 c0 0a  ........p.W..f..
0030   80 18 01 5e fe 33 00 00 01 01 08 0a 02 ac 13 81  ...^.3..........
0040   02 ac 13 81 07 00 00 02 00 00 00 02 00 00 00     ...............

Если мы просто попытаемся подключиться к серверу, например, клиентом mysql, то получим ошибку.

$ mysql -h 127.0.0.1 --port 1234
ERROR 2013 (HY000): Lost connection to MySQL server at 'reading initial communication packet', system error: 2

Таким образом, наша задача вернуть клиенту заранее записанную последовательность initial handshake.

5b0000000a352e362e32382d307562756e7475302e31352e30342e31000b0000006527315e686e716b00fff70802007f80150000000000000000000048633c406a783d635d29513e006d7973716c5f6e61746976655f70617373776f726400

Для начала реализуем простой класс преобразования hex-строки в массив байт.

public class HexUtils {
    public static byte[] hexToByte(String hex) throws Exception {
        if (hex == null || "".equals(hex)) {
            return new byte[]{};
        } else {
            checkHexString(hex);
            
            byte[] result = new byte[hex.length() / 2];

            short tmp;
            int j=0;
            for (int i=0;i<hex.length();i+=2) {
                tmp = (short) (charToShort(hex.charAt(i)) << 4);
                tmp |= charToShort(hex.charAt(i + 1));

                result[j] = (byte) tmp;

                j++;
            }

            return result;
        }
    }
    
    public static final short charToShort(char c) {
        if (c >= 0x30 && c <= 0x39) {
            return (short) (c - 0x30);
        } else {
            if (c >= 'a' && c<='f') {
                return (short) (c - 'a'+10);
            } else {
                if (c>='A' && c<='F') {
                    return (short) (c - 'A'+10);
                } else {
                    throw new RuntimeException("Conversion exception");
                }
            }
        }
    }
    
    public static void checkHexString(String hex) throws Exception {
        if (hex != null) {
            if ( (hex.length() % 2) != 0 ) {
                throw new Exception("Invalid length");
            }
        
            if (!"".equals(hex) && !hex.matches("^[0-9a-fA-F]+$")) {
                throw new Exception("Invalid string");
            }
        }
    }
}

Тест для этого класса.

import com.lrn.nettymysqlprotocol.HexUtils;
import org.junit.Test;
import static org.junit.Assert.*;

public class HexUtilsTest {
    @Test
    public void testValid() throws Exception {
        HexUtils.checkHexString(null);
        HexUtils.checkHexString("");
        HexUtils.checkHexString("0123456789AbCdEf");
    }
    
    @Test(expected = Exception.class)
    public void testInvalidLength() throws Exception {
        HexUtils.checkHexString("012");
    }
    
    @Test(expected = Exception.class)
    public void testInvalidStringSymbols() throws Exception {
        HexUtils.checkHexString(" FakeString");
    }
    
    @Test
    public void testCharToShort() {
        assertEquals(0, HexUtils.charToShort('0'));
        assertEquals(1, HexUtils.charToShort('1'));
        assertEquals(2, HexUtils.charToShort('2'));
        assertEquals(3, HexUtils.charToShort('3'));
        assertEquals(4, HexUtils.charToShort('4'));
        assertEquals(5, HexUtils.charToShort('5'));
        assertEquals(6, HexUtils.charToShort('6'));
        assertEquals(7, HexUtils.charToShort('7'));
        assertEquals(8, HexUtils.charToShort('8'));
        assertEquals(9, HexUtils.charToShort('9'));
        assertEquals(10, HexUtils.charToShort('a'));
        assertEquals(10, HexUtils.charToShort('A'));
        assertEquals(11, HexUtils.charToShort('b'));
        assertEquals(11, HexUtils.charToShort('B'));
        assertEquals(12, HexUtils.charToShort('c'));
        assertEquals(12, HexUtils.charToShort('C'));
        assertEquals(13, HexUtils.charToShort('d'));
        assertEquals(13, HexUtils.charToShort('D'));
        assertEquals(14, HexUtils.charToShort('e'));
        assertEquals(14, HexUtils.charToShort('E'));
        assertEquals(15, HexUtils.charToShort('f'));
        assertEquals(15, HexUtils.charToShort('F'));
    }

    @Test(expected = RuntimeException.class)
    public void testInvalidCharInCharToShort() {
        HexUtils.charToShort('_');
    }

    @Test
    public void testValidConversion() throws Exception {
        assertArrayEquals(new byte[]{0x01}, HexUtils.hexToByte("01"));
        assertArrayEquals(new byte[]{0x23, 0x45}, HexUtils.hexToByte("2345"));
        assertArrayEquals(new byte[]{0x23, 0x45, (byte)0xab,(byte)0xcd,(byte)0xef}, HexUtils.hexToByte("2345aBcDeF"));
        assertArrayEquals(new byte[]{0x23, 0x45, (byte)0xab,(byte)0xcd,(byte)0xef}, HexUtils.hexToByte("2345AbCdEf"));
    }
}

Добавим возврат записанного initial handshake пакета и возврат пакета OK после handshake response.

String greetString = "5b0000000a352e362e32382d307562"
    + "756e7475302e31352e30342e31000b0000006527"
    + "315e686e716b00fff70802007f80150000000000"
    + "000000000048633c406a783d635d29513e006d79"
    + "73716c5f6e61746976655f70617373776f726400";
                        
greetingBuffer
    .writeBytes(
        HexUtils
            .hexToByte(greetString)
        );

ch.writeAndFlush(greetingBuffer);
ByteBuf buffer = ctx.alloc().buffer();

try {
    buffer.writeBytes(HexUtils.hexToByte("0700000200000002000000"));
} catch (Exception ex) {
    logger.error("Error on conversion");
}

ctx.writeAndFlush(buffer);

Запустим приложение и попробуем подключиться с помощью mysql cli client.

$ mysql -h 127.0.0.1 --port=1234
Your MySQL connection id is 11
Server version: 5.6.28-0ubuntu0.15.04.1

mysql> 

Таким образом, mysql cli client решил, что прошёл авторизацию на сервере и отобразил приглашение для ввода.

Логи приложения.

2016-03-26 09:32:13 nioEventLoopGroup-3-1 INFO com.lrn.nettymysqlprotocol.App Client connected from:/127.0.0.1:34522
2016-03-26 09:32:13 nioEventLoopGroup-3-1 DEBUG io.netty.buffer.AbstractByteBuf -Dio.netty.buffer.bytebuf.checkAccessible: true
2016-03-26 09:32:13 nioEventLoopGroup-3-1 DEBUG io.netty.util.ResourceLeakDetector -Dio.netty.leakDetection.level: simple
2016-03-26 09:32:13 nioEventLoopGroup-3-1 DEBUG io.netty.util.ResourceLeakDetector -Dio.netty.leakDetection.maxRecords: 4
2016-03-26 09:32:13 nioEventLoopGroup-3-1 DEBUG io.netty.util.Recycler -Dio.netty.recycler.maxCapacity.default: 262144
2016-03-26 09:32:13 nioEventLoopGroup-3-1 DEBUG io.netty.util.internal.Cleaner0 java.nio.ByteBuffer.cleaner(): available
2016-03-26 09:32:13 nioEventLoopGroup-3-1 DEBUG com.lrn.nettymysqlprotocol.DiscardServerHandler channel read
2016-03-26 09:32:13 nioEventLoopGroup-3-1 TRACE com.lrn.nettymysqlprotocol.DiscardServerHandler 
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| ac 00 00 01 05 a6 7f 00 00 00 00 01 21 00 00 00 |............!...|
|00000010| 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
|00000020| 00 00 00 00 77 69 7a 00 00 6d 79 73 71 6c 5f 6e |....wiz..mysql_n|
|00000030| 61 74 69 76 65 5f 70 61 73 73 77 6f 72 64 00 70 |ative_password.p|
|00000040| 03 5f 6f 73 10 64 65 62 69 61 6e 2d 6c 69 6e 75 |._os.debian-linu|
|00000050| 78 2d 67 6e 75 0c 5f 63 6c 69 65 6e 74 5f 6e 61 |x-gnu._client_na|
|00000060| 6d 65 08 6c 69 62 6d 79 73 71 6c 04 5f 70 69 64 |me.libmysql._pid|
|00000070| 04 35 32 31 36 0f 5f 63 6c 69 65 6e 74 5f 76 65 |.5216._client_ve|
|00000080| 72 73 69 6f 6e 06 35 2e 36 2e 32 38 09 5f 70 6c |rsion.5.6.28._pl|
|00000090| 61 74 66 6f 72 6d 06 78 38 36 5f 36 34 0c 70 72 |atform.x86_64.pr|
|000000a0| 6f 67 72 61 6d 5f 6e 61 6d 65 05 6d 79 73 71 6c |ogram_name.mysql|
+--------+-------------------------------------------------+----------------+
2016-03-26 09:32:13 nioEventLoopGroup-3-1 DEBUG com.lrn.nettymysqlprotocol.DiscardServerHandler channel read
2016-03-26 09:32:13 nioEventLoopGroup-3-1 TRACE com.lrn.nettymysqlprotocol.DiscardServerHandler 
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 21 00 00 00 03 73 65 6c 65 63 74 20 40 40 76 65 |!....select @@ve|
|00000010| 72 73 69 6f 6e 5f 63 6f 6d 6d 65 6e 74 20 6c 69 |rsion_comment li|
|00000020| 6d 69 74 20 31                                  |mit 1           |
+--------+-------------------------------------------------+----------------+

Кроме того, видно что mysql cli client после подключения пытается выполнить запрос select @@version_comment limit 1.

Полный код App.java

public class App {

    public static final Logger logger = LoggerFactory.getLogger(App.class);

    public static int port = 1234;

    public static void main(String[] args) {
        logger.info("Application start");

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        logger.info("Client connected from:{}", ch.remoteAddress());

                        ByteBuf greetingBuffer = ch.alloc().buffer();

                        String greetString = "5b0000000a352e362e32382d307562"
                                + "756e7475302e31352e30342e31000b0000006527"
                                + "315e686e716b00fff70802007f80150000000000"
                                + "000000000048633c406a783d635d29513e006d79"
                                + "73716c5f6e61746976655f70617373776f726400";
                        
                        greetingBuffer
                            .writeBytes(
                                HexUtils
                                    .hexToByte(greetString)
                            );

                        ch.writeAndFlush(greetingBuffer);

                        ch.pipeline().addLast(new DiscardServerHandler());
                    }

                })
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture f = b.bind(port).sync();
            
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            logger.error("Error on create bootstrap", e);
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }

        logger.info("Application shutted down");
    }
}

Полный код DiscardServerHandler

public class DiscardServerHandler extends ChannelInboundHandlerAdapter {

    public static final Logger logger = LoggerFactory.getLogger(DiscardServerHandler.class);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        logger.debug("channel read");

        ByteBuf byteBuf = (ByteBuf)msg;

        if (logger.isTraceEnabled()) {
            logger.trace("\n{}", ByteBufUtil.prettyHexDump(byteBuf));
        }

        ByteBuf buffer = ctx.alloc().buffer();

        try {
            buffer.writeBytes(HexUtils.hexToByte("0700000200000002000000"));
        } catch (Exception ex) {
            logger.error("Error on conversion");
        }

        ctx.writeAndFlush(buffer);
        
        ((ByteBuf) msg).release();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        logger.debug("Exception", cause);

        cause.printStackTrace();
        ctx.close();
    }
}

Полный код доступен в ветке pres_0_14_01 https://github.com/wizardjedi/my-spring-learning/tree/pres_0_14_01 .

Цель №3: Создать сервер, который позволяет проходить handshake

На данном этапе необходимо реализовать 2 класса, которые будут представлять команды initial handshake и ok, а также класс encoder'а который будет кодировать пакеты в соответствующие байты.

Для начала реализуем интерфейс всех пакетов.

Packet.java

/**
 * Base interface for all packets
 */
public interface Packet {
    /**
     * Read body from byte buffer
     * @param bb 
     */
    void readBody(ByteBuf bb) throws Exception;
    
    /**
     * Write body to byte buffer
     * @param bb 
     */
    void writeBody(ByteBuf bb) throws Exception;
    
    /**
     * Calculate and return body size in bytes
     * @return 
     */
    int calculateBodyLength();
    
    /**
     * Calculate and set field with body length
     */
    void calculateAndSetBodyLength();
    
    /**
     * Get body length from field
     * @return 
     */
    int getBodyLength();

    /**
     * Get sequence number field value
     * @return 
     */
    int getSequenceNumber();
    
    /**
     * Set sequence number field value
     * @param sequenceNumber 
     */
    void setSequenceNumber(int sequenceNumber);    
}

Базовый класс для всех пакетов BasePacket.java

/**
 * Base class for all packets
 */
public abstract class BasePacket implements Packet {
    protected int bodyLength = 0;
    protected int sequenceNumber = 0;
    
    @Override
    public void calculateAndSetBodyLength() {
        this.bodyLength = calculateBodyLength();
    }

    @Override
    public int getBodyLength() {
        return bodyLength;
    }

    @Override
    public int getSequenceNumber() {
        return sequenceNumber;
    }

    @Override
    public void setSequenceNumber(int sequenceNumber) {
        this.sequenceNumber = sequenceNumber;
    }
}

Кроме базовых классов нам понадобятся константы принятые для Mysql. Значение констант можно найти как в документации к протоколу, так и в исходном коде mysql. MysqlConstants

public class MysqlConstants {
    
    /**
     * @see https://dev.mysql.com/doc/internals/en/text-protocol.html
     * @see https://github.com/mysql/mysql-server/blob/HEAD/include/my_command.h
     */
    public static class CommandConstants {
        public static final int COM_SLEEP = 0;
        public static final int COM_QUIT = 1;
        public static final int COM_INIT_DB = 2;
        public static final int COM_QUERY = 3;
        public static final int COM_FIELD_LIST = 4;
        public static final int COM_CREATE_DB = 5;
        public static final int COM_DROP_DB = 6;
        public static final int COM_REFRESH = 7;
        public static final int COM_SHUTDOWN = 8;
        public static final int COM_STATISTICS = 9;
        public static final int COM_PROCESS_INFO = 10;
        public static final int COM_CONNECT = 11;
        public static final int COM_PROCESS_KILL = 12;
        public static final int COM_DEBUG = 13;
        public static final int COM_PING = 14;
        public static final int COM_TIME = 15;
        public static final int COM_DELAYED_INSERT = 16;
        public static final int COM_CHANGE_USER = 17;
        public static final int COM_BINLOG_DUMP = 18;
        public static final int COM_TABLE_DUMP = 19;
        public static final int COM_CONNECT_OUT = 20;
        public static final int COM_REGISTER_SLAVE = 21;
        public static final int COM_STMT_PREPARE = 22;
        public static final int COM_STMT_EXECUTE = 23;
        public static final int COM_STMT_SEND_LONG_DATA = 24;
        public static final int COM_STMT_CLOSE = 25;
        public static final int COM_STMT_RESET = 26;
        public static final int COM_SET_OPTION = 27;
        public static final int COM_STMT_FETCH = 28;
        public static final int COM_DAEMON = 29;
        public static final int COM_BINLOG_DUMP_GTID = 30;
        public static final int COM_RESET_CONNECTION = 31;
    }
    
    /**
     * @see https://dev.mysql.com/doc/internals/en/capability-flags.html#packet-Protocol::CapabilityFlags
     * @see https://github.com/mysql/mysql-server/blob/HEAD/include/mysql_com.h
     */
    public static class CapabilitiesConstants {
        public static final long CLIENT_LONG_PASSWORD = 1;        /* new more secure passwords */
        public static final long CLIENT_FOUND_ROWS = 2;           /* Found instead of affected rows */
        public static final long CLIENT_LONG_FLAG = 4;            /* Get all column flags */
        public static final long CLIENT_CONNECT_WITH_DB = 8;      /* One can specify db on connect */
        public static final long CLIENT_NO_SCHEMA = 16;           /* Don't allow database.table.column */
        public static final long CLIENT_COMPRESS = 32;            /* Can use compression protocol */
        public static final long CLIENT_ODBC = 64;                /* Odbc client */
        public static final long CLIENT_LOCAL_FILES = 128;        /* Can use LOAD DATA LOCAL */
        public static final long CLIENT_IGNORE_SPACE = 256;       /* Ignore spaces before left brace */
        public static final long CLIENT_PROTOCOL_41 = 512;        /* New 4.1 protocol */
        public static final long CLIENT_INTERACTIVE = 1024;       /* This is an interactive client */
        public static final long CLIENT_SSL = 2048;               /* Switch to SSL after handshake */
        public static final long CLIENT_IGNORE_SIGPIPE = 4096;    /* IGNORE sigpipes */
        public static final long CLIENT_TRANSACTIONS = 8192;      /* Client knows about transactions */
        public static final long CLIENT_RESERVED = 16384;         /* Old flag for 4.1 protocol  */
        public static final long CLIENT_RESERVED2 = 32768;        /* Old flag for 4.1 authentication */
        public static final long CLIENT_MULTI_STATEMENTS = 1L << 16;  /* Enable/disable multi-stmt support */
        public static final long CLIENT_MULTI_RESULTS = 1L << 17;     /* Enable/disable multi-results */
        public static final long CLIENT_PS_MULTI_RESULTS = 1L << 18;  /* Multi-results in PS-protocol */
        public static final long CLIENT_PLUGIN_AUTH = 1L << 19;       /* Client supports plugin authentication */
        public static final long CLIENT_CONNECT_ATTRS = 1L << 20;     /* Client supports connection attributes */
        public static final long CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA = 1L << 21;
        public static final long CLIENT_CAN_HANDLE_EXPIRED_PASSWORDS = 1L << 22;
        public static final long CLIENT_SESSION_TRACK = 1L << 23;
        public static final long CLIENT_DEPRECATE_EOF = 1L << 24;
        public static final long CLIENT_SSL_VERIFY_SERVER_CERT = 1L << 30;
        public static final long CLIENT_REMEMBER_OPTIONS = 1L << 31;
    }
    
    /**
     * @see https://dev.mysql.com/doc/internals/en/character-set.html#packet-Protocol::CharacterSet
     * @see 
     */
    public static class CharsetConstants {
        public static final int BIG5_CHINESE_CI = 1;
        public static final int LATIN2_CZECH_CS = 2;
        public static final int DEC8_SWEDISH_CI = 3;
        public static final int CP850_GENERAL_CI = 4;
        public static final int LATIN1_GERMAN1_CI = 5;
        public static final int HP8_ENGLISH_CI = 6;
        public static final int KOI8R_GENERAL_CI = 7;
        public static final int LATIN1_SWEDISH_CI = 8;
        public static final int LATIN2_GENERAL_CI = 9;
        public static final int SWE7_SWEDISH_CI = 10;
        public static final int UTF8_GENERAL_CI = 33;
        public static final int BINARY = 63;
    }
    
    /**
     * @see https://dev.mysql.com/doc/internals/en/status-flags.html#packet-Protocol::StatusFlags
     * @see https://github.com/mysql/mysql-server/blob/HEAD/include/mysql_com.h
     */
    public static class StatusConstants {
        public static final long SERVER_STATUS_IN_TRANS = 0x0001; //a transaction is active
        public static final long SERVER_STATUS_AUTOCOMMIT = 0x0002;	//auto-commit is enabled
        public static final long SERVER_MORE_RESULTS_EXISTS = 0x0008;
        public static final long SERVER_STATUS_NO_GOOD_INDEX_USED = 0x0010;
        public static final long SERVER_STATUS_NO_INDEX_USED = 0x0020;
        public static final long SERVER_STATUS_CURSOR_EXISTS = 0x0040;    //	Used by Binary Protocol Resultset to signal that COM_STMT_FETCH must be used to fetch the row-data.
        public static final long SERVER_STATUS_LAST_ROW_SENT = 0x0080;
        public static final long SERVER_STATUS_DB_DROPPED = 0x0100;
        public static final long SERVER_STATUS_NO_BACKSLASH_ESCAPES = 0x0200;
        public static final long SERVER_STATUS_METADATA_CHANGED = 0x0400;
        public static final long SERVER_QUERY_WAS_SLOW = 0x0800;
        public static final long SERVER_PS_OUT_PARAMS	 = 0x1000;
        public static final long SERVER_STATUS_IN_TRANS_READONLY = 0x2000;	//in a read-only transaction
        public static final long SERVER_SESSION_STATE_CHANGED = 0x4000;	//connection state information has changed
    }
}

Кроме того, для реализации понадобится класс, который будет помогать преобразовывать разные типы значений для протокола. MysqlByteBufUtil.java

package com.lrn.nettymysqlprotocol.protocol;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

public class MysqlByteBufUtil {
    /**
     * @see https://dev.mysql.com/doc/internals/en/integer.html
     * @return 
     */
    public static int getInt1Length() {
        return 1;
    }
    
    public static int getInt2Length() {
        return 2;
    }
    
    public static int getInt3Length() {
        return 3;
    }
    
    public static ByteBuf writeInt1(ByteBuf byteBuffer, long value) {
        return writeInt(byteBuffer, value, 1);
    }

    public static ByteBuf writeInt2(ByteBuf byteBuffer, long value) {
        return writeInt(byteBuffer, value, 2);
    }

    public static ByteBuf writeInt3(ByteBuf byteBuffer, long value) {
        return writeInt(byteBuffer, value, 3);
    }

    public static ByteBuf writeInt4(ByteBuf byteBuffer, long value) {
        return writeInt(byteBuffer, value, 4);
    }

    public static ByteBuf writeInt(ByteBuf buffer, long value, int size) {
        long newValue = value;

        for (int i=0;i<size;i++) {
            buffer.writeByte((int)(newValue & 0xFF));

            newValue = newValue >> 8;
        }

        return buffer;
    }

    public static long readInt1(ByteBuf buffer) {
        return readInt(buffer, 1);
    }
    
    public static long readInt2(ByteBuf buffer) {
        return readInt(buffer, 2);
    }
    
    public static long readInt3(ByteBuf buffer) {
        return readInt(buffer, 3);
    }
    
    public static long readInt4(ByteBuf buffer) {
        return readInt(buffer, 4);
    }
    
    public static long readInt(ByteBuf buffer, int size) {
        long value = 0;
        
        for (int i=0;i<size;i++) {
            long readedByte = buffer.readUnsignedByte();
            
            long newValue = readedByte << (8*i);
            
            value |= newValue;            
        }
        
        return value;
    }
    
    public static int getStringLength(byte[] val) {
        return val.length;
    }
    
    /**
     * @see https://dev.mysql.com/doc/internals/en/string.html
     * @param buffer
     * @param value
     * @return 
     */
    public static ByteBuf writeString(ByteBuf buffer, byte[] value) {
        buffer.writeBytes(value);

        return buffer;
    }

    public static byte[] readNullTerminatedString(ByteBuf bb) {
        ByteBuf readBuffer = Unpooled.buffer();
                
        byte b;
        do {
            b = bb.readByte();
            
            if (b != 0x00) {
                readBuffer.writeByte(b);
            }
        } while (b != 0x00);
                
        byte[] result = new byte[readBuffer.readableBytes()];
        
        readBuffer.readBytes(result);
        
        return result;
    }
    
    public static ByteBuf writeNullTerminatedString(ByteBuf buffer, byte[] value) {
        buffer.writeBytes(value);

        buffer.writeByte(0x00);

        return buffer;
    }
    
    /**
     * @see https://dev.mysql.com/doc/internals/en/integer.html#packet-Protocol::LengthEncodedInteger
     * @param value
     * @return 
     */
    public static long getLenencIntegerLength(long value) {
        if (value < 251) {
            return 1;
        } else {
            if (value >= 251 && value < 65536) {
                return 3;
            } else {
                if (value >= 65536 && value < 16777216) {
                    return 4;
                } else {
                    return 9;
                }
            }
        }
    }
    
    /**
     * @see https://dev.mysql.com/doc/internals/en/string.html#packet-Protocol::LengthEncodedString
     * @param data
     * @return 
     */
    public static long getLenencStringLength(byte[] data) {
        if (data == null) {
			return 1;
		} else {
			if (data.length > 250) {
				if(data.length>0xFFFFFF) {
					return 4 + data.length;
				} else if(data.length>0xFFFF) {
					return 3 + data.length;
				} else  {
					return 2 + data.length;
				}
			} else {
				return 1 + data.length;
			}
		}
    }
    
    /**
     * @see https://dev.mysql.com/doc/internals/en/integer.html#packet-Protocol::LengthEncodedInteger
     * @param out
     * @param value 
     */
    public static void writeLenencInteger(ByteBuf out, long value) {
        if (value < 251) {
            MysqlByteBufUtil.writeInt(out, value, 1);
        } else {
            if (value >= 251 && value < 65536) {
                out.writeByte(0xFC);
                MysqlByteBufUtil.writeInt(out, value, 2);
            } else {
                if (value >= 65536 && value < 16777216) {
                    out.writeByte(0xFD);
                    MysqlByteBufUtil.writeInt(out, value, 3);
                } else {
                    out.writeByte(0xFE);
                    MysqlByteBufUtil.writeInt(out, value, 8);
                }
            }
        }
    }
    
    /**
     * @see https://dev.mysql.com/doc/internals/en/string.html#packet-Protocol::LengthEncodedString
     * @param out
     * @param data 
     */
    public static void writeLenencString(ByteBuf out, byte[] data) {
        if (data == null) {
			out.writeByte((byte) 251);
		} else {
			if (data.length > 250) {
				if(data.length>0xFFFFFF) {
					out.writeByte((byte)(254));
                    MysqlByteBufUtil.writeInt(out, data.length, 4);
					
				} else if(data.length>0xFFFF) {
					out.writeByte((byte)(253));
                    MysqlByteBufUtil.writeInt(out, data.length, 3);                    
				} else  {
					out.writeByte((byte)(252));
                    MysqlByteBufUtil.writeInt(out, data.length, 2);
				}
				out.writeBytes(data);
			} else {
				out.writeByte((byte) data.length);
				out.writeBytes(data);
			}
		}
    }
}

Создадим простой класс для конвертации пакетов в byte buffer.

public class MysqlTranscoder {
    /**
     * @see https://dev.mysql.com/doc/internals/en/mysql-packet.html
     * @param packet
     * @param bb
     * @throws Exception 
     */
    public void encode(Packet packet, ByteBuf bb) throws Exception {
        if (packet != null && bb != null && bb.isWritable()) {
            packet.calculateAndSetBodyLength();

            MysqlByteBufUtil.writeInt3(bb, packet.getBodyLength());
            MysqlByteBufUtil.writeInt1(bb, packet.getSequenceNumber());
        
            packet.writeBody(bb);
        }
    }
}

Напишем простой тест transcoder'а.

public class MysqlTranscoderTest {
    protected MysqlTranscoder transcoder = new MysqlTranscoder();
    
    @Test
    public void test() throws Exception {
        ByteBuf buffer = Unpooled.buffer();
        
        transcoder.encode(new EmptyClass(0x12), buffer);
        
        assertEquals("0100000012", ByteBufUtil.hexDump(buffer));
    }
    
    public class EmptyClass extends BasePacket {
        protected int val;

        public EmptyClass(int val) {
            this.val = val;
        }
        
        @Override
        public void readBody(ByteBuf bb) throws Exception {
            this.val = (int)MysqlByteBufUtil.readInt1(bb);
        }

        @Override
        public void writeBody(ByteBuf bb) throws Exception {
            MysqlByteBufUtil.writeInt1(bb, val);
        }

        @Override
        public int calculateBodyLength() {
            return MysqlByteBufUtil.getInt1Length();
        }
    }
}

Теперь необходимо реализовать класс, который будет представлять команду OK. И тут мы сталкиваемся с проблемой, что пакет Ok может быть закодирован по разному. В зависимости от возможностей сервера и/или клиента пакеты могут кодироваться по разному.

Например, пакет OK.

Type    Name    Description
int<1>	header	[00] or [fe] the OK packet header
int<lenenc>	affected_rows	affected rows
int<lenenc>	last_insert_id	last insert-id
if capabilities & CLIENT_PROTOCOL_41 {
  int<2>	status_flags	Status Flags
  int<2>	warnings	number of warnings
} elseif capabilities & CLIENT_TRANSACTIONS {
  int<2>	status_flags	Status Flags
}
if capabilities & CLIENT_SESSION_TRACK {
  string<lenenc>	info	human readable status information
  if status_flags & SERVER_SESSION_STATE_CHANGED {
    string<lenenc>	session_state_changes	session state info
  }
} else {
  string<EOF>	info	human readable status information
}

Необходимо реализовать 2 класса: Capabilities и ServerStatus, которые будут хранить capability и текущий статус. Данные классы будут использоваться в классе TranscoderContext, который будет хранить контекст Transcoder'а для правильного кодирования/декодирования.

public class Capabilities {
    protected long capabilities;

    public long getCapabilities() {
        return capabilities;
    }

    public void setCapabilities(long capabilities) {
        this.capabilities = capabilities;
    }
    
    public boolean isClientLongPassword(){
        return (capabilities & MysqlConstants.CapabilitiesConstants.CLIENT_LONG_PASSWORD ) != 0;
    }
    public void setClientLongPassword(){
        capabilities = capabilities | MysqlConstants.CapabilitiesConstants.CLIENT_LONG_PASSWORD ;
    }
. . .
}
public class ServerStatus {
    protected long status;

    public long getStatus() {
        return status;
    }

    public void setStatus(long status) {
        this.status = status;
    }

    public boolean isServerStatusInTrans(){
        return (status & MysqlConstants.StatusConstants.SERVER_STATUS_IN_TRANS) != 0;
    }
    public void setServerStatusInTrans(){
        status = status | MysqlConstants.StatusConstants.SERVER_STATUS_IN_TRANS;
    }
. . .
}

Класс TranscoderContext.

public class TranscoderContext {
    protected Capabilities capabilities;

    protected ServerStatus serverStatus;
    
    public Capabilities getCapabilities() {
        return capabilities;
    }

    public void setCapabilities(Capabilities capabilities) {
        this.capabilities = capabilities;
    }

    public ServerStatus getServerStatus() {
        return serverStatus;
    }

    public void setServerStatus(ServerStatus serverStatus) {
        this.serverStatus = serverStatus;
    }   
}

Кроме того, необходимо внести изменения в transcoder, базовый класс для пакетов и интерфейс. Приведём изменения только transcoder'а.

public class MysqlTranscoder {
    
    protected TranscoderContext context;

    public TranscoderContext getContext() {
        return context;
    }

    public void setContext(TranscoderContext context) {
        this.context = context;
    }
    
    /**
     * @see https://dev.mysql.com/doc/internals/en/mysql-packet.html
     * @param packet
     * @param bb
     * @throws Exception 
     */
    public void encode(Packet packet, ByteBuf bb) throws Exception {
        if (packet != null && bb != null && bb.isWritable()) {
            packet.calculateAndSetBodyLength(context);

            MysqlByteBufUtil.writeInt3(bb, packet.getBodyLength());
            MysqlByteBufUtil.writeInt1(bb, packet.getSequenceNumber());
        
            packet.writeBody(bb, context);
        }
    }
}

Реализуем пакет Ok и InitialHandshake.

/**
 * @see https://dev.mysql.com/doc/internals/en/packet-OK_Packet.html
 */
public class OkPacket extends BasePacket {

    protected long affectedRows = 0;
    
    protected long lastInsertId = 0;
    
    protected String humanReadableStatus = "";

    protected String sessionStateChanges = "";

    protected long warningCount = 0;

    public long getWarningCount() {
        return warningCount;
    }

    public void setWarningCount(long warningCount) {
        this.warningCount = warningCount;
    }
    
    public String getSessionStateChanges() {
        return sessionStateChanges;
    }

    public void setSessionStateChanges(String sessionStateChanges) {
        this.sessionStateChanges = sessionStateChanges;
    }
    
    public String getHumanReadableStatus() {
        return humanReadableStatus;
    }

    public void setHumanReadableStatus(String humanReadableStatus) {
        this.humanReadableStatus = humanReadableStatus;
    }
    
    public long getAffectedRows() {
        return affectedRows;
    }

    public void setAffectedRows(long affectedRows) {
        this.affectedRows = affectedRows;
    }

    public long getLastInsertId() {
        return lastInsertId;
    }

    public void setLastInsertId(long lastInsertId) {
        this.lastInsertId = lastInsertId;
    }
    
    @Override
    public void readBody(ByteBuf bb, TranscoderContext context) throws Exception {
        throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
    }

    @Override
    public void writeBody(ByteBuf bb, TranscoderContext context) throws Exception {
        Capabilities capabilities = context.getCapabilities();
        ServerStatus status = context.getServerStatus();
        
        // int<1>	header	[00] or [fe] the OK packet header
        MysqlByteBufUtil.writeInt1(bb, 0x00); 
        //int<lenenc>	affected_rows	affected rows
        MysqlByteBufUtil.writeLenencInteger(bb, getAffectedRows()); 
        //int<lenenc>	last_insert_id	last insert-id
        MysqlByteBufUtil.writeLenencInteger(bb, getLastInsertId()); 
        
        if (capabilities.isClientProtocol41()) {  //if capabilities & CLIENT_PROTOCOL_41 {
            //  int<2>	status_flags	Status Flags
            MysqlByteBufUtil.writeInt2(bb, status.getStatus()); 
            //  int<2>	warnings	number of warnings
            MysqlByteBufUtil.writeInt2(bb, getWarningCount()); 
        } else {
            if (capabilities.isClientTransactions()) { //} elseif capabilities & CLIENT_TRANSACTIONS {
                //  int<2>	status_flags	Status Flags
                MysqlByteBufUtil.writeInt2(bb, status.getStatus()); 
            } //}
        }
        
        if (capabilities.isClientSessionTrack()) { //if capabilities & CLIENT_SESSION_TRACK {
            //  string<lenenc>	info	human readable status information
            MysqlByteBufUtil.writeLenencString(bb, getHumanReadableStatus().getBytes()); 
            
            //  if status_flags & SERVER_SESSION_STATE_CHANGED {
            if (status.isServerSessionStateChanged()) { 
                //    string<lenenc>	session_state_changes	session state info
                MysqlByteBufUtil.writeLenencString(bb, getSessionStateChanges().getBytes()); 
            }
        } else {
            //  string<EOF>	info	human readable status information
            MysqlByteBufUtil.writeStringEof(bb, getHumanReadableStatus().getBytes()); 
        }
    }

    @Override
    public int calculateBodyLength(TranscoderContext context) {
        Capabilities capabilities = context.getCapabilities();
        ServerStatus status = context.getServerStatus();
        
        int result = 0;
        
        // int<1>	header	[00] or [fe] the OK packet header
        result += 1; 
        //int<lenenc>	affected_rows	affected rows
        result += MysqlByteBufUtil.getLenencIntegerLength(getAffectedRows()); 
        //int<lenenc>	last_insert_id	last insert-id
        result += MysqlByteBufUtil.getLenencIntegerLength(getLastInsertId()); 
        
        if (capabilities.isClientProtocol41()) {  //if capabilities & CLIENT_PROTOCOL_41 {
            //  int<2>	status_flags	Status Flags
            result += 2; 
            //  int<2>	warnings	number of warnings
            result += 2; 
        } else {
            if (capabilities.isClientTransactions()) { //} elseif capabilities & CLIENT_TRANSACTIONS {
                //  int<2>	status_flags	Status Flags
                result += 2; 
            } //}
        }
        
        if (capabilities.isClientSessionTrack()) { //if capabilities & CLIENT_SESSION_TRACK {
            //  string<lenenc>	info	human readable status information
            result += MysqlByteBufUtil.getLenencStringLength(getHumanReadableStatus().getBytes()); 
            
            //  if status_flags & SERVER_SESSION_STATE_CHANGED {
            if (status.isServerSessionStateChanged()) { 
                //    string<lenenc>	session_state_changes	session state info
                result += MysqlByteBufUtil.getLenencStringLength(getSessionStateChanges().getBytes()); 
            }
        } else {
            //  string<EOF>	info	human readable status information
            result += MysqlByteBufUtil.getStringEofLength(getHumanReadableStatus().getBytes()); 
        }

        return result;
    }
}
public class InitialHandshakePacket extends BasePacket {

    protected int protocolVersion = 0x0a;
    protected String serverName = "Fake mysql server v1.0";
    protected long connectionId;
    protected byte[] scramble;
    protected Capabilities capabilities = new Capabilities();
    protected ServerStatus status = new ServerStatus();
    protected int characterSet = MysqlConstants.CharsetConstants.UTF8_GENERAL_CI;
    protected String authPluginName;

    public int getProtocolVersion() {
        return protocolVersion;
    }

    public void setProtocolVersion(int protocolVersion) {
        this.protocolVersion = protocolVersion;
    }

    public String getServerName() {
        return serverName;
    }

    public void setServerName(String serverName) {
        this.serverName = serverName;
    }

    public long getConnectionId() {
        return connectionId;
    }

    public void setConnectionId(long connectionId) {
        this.connectionId = connectionId;
    }

    public byte[] getScramble() {
        return scramble;
    }

    public void setScramble(byte[] scramble) {
        this.scramble = scramble;
    }

    public Capabilities getCapabilities() {
        return capabilities;
    }

    public void setCapabilities(Capabilities capabilities) {
        this.capabilities = capabilities;
    }

    public ServerStatus getStatus() {
        return status;
    }

    public void setStatus(ServerStatus status) {
        this.status = status;
    }

    public int getCharacterSet() {
        return characterSet;
    }

    public void setCharacterSet(int characterSet) {
        this.characterSet = characterSet;
    }

    public String getAuthPluginName() {
        return authPluginName;
    }

    public void setAuthPluginName(String authPluginName) {
        this.authPluginName = authPluginName;
    }
    
    @Override
    public void readBody(ByteBuf bb, TranscoderContext context) throws Exception {
        throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
    }

    @Override
    public void writeBody(ByteBuf bb, TranscoderContext context) throws Exception {
        // [0a] protocol version
        MysqlByteBufUtil.writeInt1(bb, getProtocolVersion());
        //string[NUL]    server version
        MysqlByteBufUtil.writeNullTerminatedString(bb, getServerName().getBytes());
        //4              connection id
        MysqlByteBufUtil.writeInt4(bb, getConnectionId());
        //string[8]      auth-plugin-data-part-1
        MysqlByteBufUtil.writeLengthedString(bb, getScramble(), 8);
        //1              [00] filler
        MysqlByteBufUtil.writeInt1(bb, 0x00);
        //2              capability flags (lower 2 bytes)
        MysqlByteBufUtil.writeInt2(bb, getCapabilities().getLowWord());
        
        //1              character set
        MysqlByteBufUtil.writeInt1(bb, getCharacterSet());
        //2              status flags
        MysqlByteBufUtil.writeInt2(bb, getStatus().getStatus());
        //2              capability flags (upper 2 bytes)
        MysqlByteBufUtil.writeInt2(bb, getCapabilities().getHighWord());
        
        if (capabilities != null && capabilities.isClientPluginAuth()) {
            //1              length of auth-plugin-data
            MysqlByteBufUtil.writeInt1(bb, getAuthPluginName().getBytes().length);
        } else {
            //1              [00] filler
            MysqlByteBufUtil.writeInt1(bb, 0x00);
        }
        
        //string[10]     reserved (all [00])
        bb.writeZero(10);
        
        if (capabilities.isClientSecureConnection()) {
            // string[$len]   auth-plugin-data-part-2 ($len=MAX(13, length of auth-plugin-data - 8))
            bb.writeBytes(getScramble(), 8, 12);
            bb.writeByte(0x00);
        }
        
        if (capabilities.isClientPluginAuth()) {
            // string[NUL]    auth-plugin name
            MysqlByteBufUtil.writeNullTerminatedString(bb, getAuthPluginName().getBytes());
        }
    }

    @Override
    public int calculateBodyLength(TranscoderContext context) {
        int result = 0;
        
        // [0a] protocol version
        result += 1;
        //string[NUL]    server version
        result += MysqlByteBufUtil.getNullTerminatedStringLength(getServerName().getBytes());
        //4              connection id
        result += 4;
        //string[8]      auth-plugin-data-part-1
        result += 8;
        //1              [00] filler
        result += 1;
        //2              capability flags (lower 2 bytes)
        result += 2;
        
        //1              character set
        result += 1;
        //2              status flags
        result += 2;
        //2              capability flags (upper 2 bytes)
        result += 2;
        
        if (capabilities != null && capabilities.isClientPluginAuth()) {
            //1              length of auth-plugin-data
            result += 1;
        } else {
            //1              [00] filler
            result += 1;
        }
        
        //string[10]     reserved (all [00])
        result += 10;
        
        if (capabilities.isClientSecureConnection()) {
            // string[$len]   auth-plugin-data-part-2 ($len=MAX(13, length of auth-plugin-data - 8))
            result += 13;
        }
        
        if (capabilities.isClientPluginAuth()) {
            // string[NUL]    auth-plugin name
            result += MysqlByteBufUtil.getNullTerminatedStringLength(getAuthPluginName().getBytes());
        }
        
        return result;
    }    
}

Соответствующие тесты.

public class OkPacketTest {
    
    protected MysqlTranscoder transcoder;
    
    protected TranscoderContext transcoderContext;
    
    protected Capabilities capabilities;
    
    protected ServerStatus serverStatus;
    
    @Before
    public void before() {
        transcoder = new MysqlTranscoder();
        transcoderContext = new TranscoderContext();
        transcoder.setContext(transcoderContext);
        
        capabilities = transcoderContext.getCapabilities();
        serverStatus = transcoderContext.getServerStatus();
    }
    
    @Test
    public void testSimpleOk() throws Exception {
        ByteBuf buffer = Unpooled.buffer();
        
        OkPacket okPacket = new OkPacket();
        okPacket.setSequenceNumber(2);
        
        capabilities.setClientProtocol41();
        serverStatus.setServerStatusAutocommit();
        
        transcoder.encode(okPacket, buffer);
        
        assertEquals("0700000200000002000000", ByteBufUtil.hexDump(buffer));
    }
    
     @Test
    public void testSimpleOk2() throws Exception {
        ByteBuf buffer = Unpooled.buffer();
        
        OkPacket okPacket = new OkPacket();
        okPacket.setSequenceNumber(1);
        
        capabilities.setClientProtocol41();
        serverStatus.setServerStatusAutocommit();
        
        okPacket.setAffectedRows(1);
        okPacket.setLastInsertId(2);
        
        transcoder.encode(okPacket, buffer);
        
        assertEquals("0700000100010202000000", ByteBufUtil.hexDump(buffer));
    }
}
public class InitialHandshakePacketTest {
    
    protected MysqlTranscoder transcoder;
    
    protected TranscoderContext transcoderContext;
    
    protected Capabilities capabilities;
    
    protected ServerStatus serverStatus;
    
    public static final String FAKE_MY_SQL_SERVER_VERSION = "5.6.28-0ubuntu0.15.04.1";
    
    @Before
    public void before() {
        transcoder = new MysqlTranscoder();
        transcoderContext = new TranscoderContext();
        transcoder.setContext(transcoderContext);
        
        capabilities = transcoderContext.getCapabilities();
        serverStatus = transcoderContext.getServerStatus();
    }
    
    @Test
    public void testSimpleOk() throws Exception {
        InitialHandshakePacket initialHandshake = new InitialHandshakePacket();

        capabilities.setCapabilities(0x807ff7ff);
        serverStatus.setStatus(0x0002);
        
        initialHandshake.setSequenceNumber(0);
        
        initialHandshake.setProtocolVersion(10);
        initialHandshake.setConnectionId(11);
        initialHandshake.setServerName(FAKE_MY_SQL_SERVER_VERSION);
        initialHandshake.setAuthPluginName("mysql_native_password");
        initialHandshake.setScramble("e'1^hnqkHc<@jx=c])Q>".getBytes());
        initialHandshake.setCharacterSet(MysqlConstants.CharsetConstants.LATIN1_SWEDISH_CI);
        initialHandshake.setCapabilities(capabilities);
        initialHandshake.setStatus(serverStatus);

        ByteBuf buffer = Unpooled.buffer();

        transcoder.encode(initialHandshake, buffer);

        String expected =
             "5b0000000a352e362e32382d30756275"
            +"6e7475302e31352e30342e31000b0000"
            +"006527315e686e716b00fff70802007f"
            +"80150000000000000000000048633c40"
            +"6a783d635d29513e006d7973716c5f6e"
            +"61746976655f70617373776f726400";

        String hexBuf = ByteBufUtil.hexDump(buffer);
        
        assertEquals(expected.toLowerCase(), hexBuf.toLowerCase());
    }
}

Цель №4: Создать сервер, который принимает команды COM_QUERY

Команда COM_QUERY - это команда, которая содержит запрос на выполнение. Результат данной команды - это результат выполнения запроса.

Описание команды - https://dev.mysql.com/doc/internals/en/com-query.html.

На данном шаге необходимо реализовать пакет COM_QUERY.

Сам запрос очень просто. Идентификатор пакета и сам запрос.

1              [03] COM_QUERY
string[EOF]    the query the server shall execute

Реализуем данный пакет.

public class ComQueryPacket extends BasePacket {

    protected String query;

    public String getQuery() {
        return query;
    }

    public void setQuery(String query) {
        this.query = query;
    }

    @Override
    public void readBody(ByteBuf bb, TranscoderContext context) throws Exception {
        int len = getBodyLength() - 1;

        setQuery(new String(MysqlByteBufUtil.readEofString(bb, len), "UTF-8"));
    }

    @Override
    public void writeBody(ByteBuf bb, TranscoderContext context) throws Exception {
        MysqlByteBufUtil.writeInt1(bb, MysqlConstants.CommandConstants.COM_QUERY);
        MysqlByteBufUtil.writeStringEof(bb, getQuery().getBytes());
    }

    @Override
    public int calculateBodyLength(TranscoderContext context) {
        int result = 1;

        result+= MysqlByteBufUtil.getStringEofLength(getQuery().getBytes());

        return result;
    }

    @Override
    public String toString() {
        return "ComQueryPacket{" + "query=" + query + '}';
    }
}

Реализуем соответствующий тест:

public class ComQueryPacketTest {

    protected MysqlTranscoder transcoder;

    protected TranscoderContext transcoderContext;

    protected Capabilities capabilities;

    protected ServerStatus serverStatus;

    @Before
    public void before() {
        transcoder = new MysqlTranscoder();
        transcoderContext = new TranscoderContext();

        transcoderContext.setCommandPhase();

        transcoder.setContext(transcoderContext);

        capabilities = transcoderContext.getCapabilities();
        serverStatus = transcoderContext.getServerStatus();
    }

    @Test
    public void testRead() throws Exception {
        String queryStr = "210000000373656c65637420404076657273696f6e5f636f6d6d656e74206c696d69742031";

        byte[] byteQuery = HexUtils.hexToByte(queryStr);

        ByteBuf writeByteBuffer = Unpooled.buffer().writeBytes(byteQuery);

        ComQueryPacket cqp = (ComQueryPacket) transcoder.decode(writeByteBuffer);

        Assert.assertEquals("select @@version_comment limit 1", cqp.getQuery());
    }

    @Test
    public void testWrite() throws Exception {
        ComQueryPacket comQueryPacket = new ComQueryPacket();
        comQueryPacket.setQuery("select @@version_comment limit 1");

        ByteBuf buffer = Unpooled.buffer();

        transcoder.encode(comQueryPacket, buffer);

        String expected = "210000000373656c65637420404076657273696f6e5f636f6d6d656e74206c696d69742031";

        String hexBuf = ByteBufUtil.hexDump(buffer);

        assertEquals(expected.toLowerCase(), hexBuf.toLowerCase());
    }
}

Далее перепишем сервер таким образом, чтобы отделить обработчики и код самого сервера для расширяемости. Необходимо реализовать следующую схему работы:

schema

MysqlServer - это основной класс, который будет создавать listen-сервер, настраивать сетевые параметры, устанавливать ChannelInitializer и т.д.

MysqlServerHandler - это интерфейс обработчика, который будет заниматься обработкой поступивших команд.

DefaultMysqlServerHandler - класс обработчика по умолчанию, который только логирует приходящие команды. Может быть использован про написании собственных обработчиков, чтобы не переписывать всю реализацию, а реализовать только несколько методов.

Кроме того, в описании протокола Mysql есть разделение на фазы работы:

  • Auth Phase - фаза аутентификации
  • Command Phase - фаза команд

Фаза влияет на декодирование пакетов. Таким образом, данная информация очень важна для декодера пакетов.

Для начала добавим в Transсoder хранения текущей фазы подключения.

public class TranscoderContext {
    public static final Logger logger = LoggerFactory.getLogger(TranscoderContext.class);
    
    protected Capabilities capabilities = new Capabilities();

    protected ServerStatus serverStatus = new ServerStatus();

    protected Class phase = AuthPhase.class;

    public Capabilities getCapabilities() {
        return capabilities;
    }

    public void setCapabilities(Capabilities capabilities) {
        this.capabilities = capabilities;
    }

    public ServerStatus getServerStatus() {
        return serverStatus;
    }

    public void setServerStatus(ServerStatus serverStatus) {
        this.serverStatus = serverStatus;
    }

    public boolean isAuthPhase() {
        return this.phase == AuthPhase.class;
    }

    public boolean isCommandPhase() {
        return this.phase == CommandPhase.class;
    }

    public void setAuthPhase() {
        this.phase = AuthPhase.class;
    }

    public void setCommandPhase() {
        this.phase = CommandPhase.class;
    }


    public static class AuthPhase {

    }

    public static class CommandPhase {

    }
}

По умолчанию выбрана AuthPhase, но в процессе работы фаза может быть переопределена.

Реализуем класс MysqlServer'а, который реализует всю работу по созданию сервера в Netty.

public class MysqlServer {
    public static final Logger logger = LoggerFactory.getLogger(App.class);
    
    protected EventLoopGroup bossGroup;
    protected EventLoopGroup workerGroup;
    protected int port = 3306;
    protected MysqlServerHandler serverHandler;
    protected String serverName = "AMCF(A1S MySQL connection framework)";
    protected String version = "0.0.1";
    
    public MysqlServer() {
    }
    
    public MysqlServer(int port) {
        this.port = port;
    }
    
    public void run() {
        if (bossGroup == null) {
            bossGroup = new NioEventLoopGroup();
        }
        
        if (workerGroup == null) {
            workerGroup = new NioEventLoopGroup();
        }
        
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new DefaultChannelInitializer(this))
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture f = b.bind(port).sync();

            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            logger.error("Error on create bootstrap", e);
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public String getServerDesription() {
        return getServerName()+"-"+getVersion();
    }
    
    public String getServerName() {
        return serverName;
    }

    public void setServerName(String serverName) {
        this.serverName = serverName;
    }

    public String getVersion() {
        return version;
    }

    public void setVersion(String version) {
        this.version = version;
    }    
    
    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }
    
    public EventLoopGroup getBossGroup() {
        return bossGroup;
    }

    public void setBossGroup(EventLoopGroup bossGroup) {
        this.bossGroup = bossGroup;
    }

    public EventLoopGroup getWorkerGroup() {
        return workerGroup;
    }

    public void setWorkerGroup(EventLoopGroup workerGroup) {
        this.workerGroup = workerGroup;
    }

    public MysqlServerHandler getServerHandler() {
        return serverHandler;
    }

    public void setServerHandler(MysqlServerHandler serverHandler) {
        this.serverHandler = serverHandler;
    }    
}

Заготовка для MysqlServerHandler

/**
 * 
 * @TODO: devide to connection handler and server handler
 */
public interface MysqlServerHandler {
    MysqlConnectionHandler onClientConnect(Channel channel);
    
    void onClientDisconnect(Channel channel);
    
    void onLogin();
}

Реализуем ChannelInitializer, который создаёт все необходимые объекты и отправляет приветствие в ответ на подключение.

public class DefaultChannelInitializer extends ChannelInitializer<SocketChannel> {
    public static final Logger logger = LoggerFactory.getLogger(DefaultChannelInitializer.class);

    protected final static Random random = new SecureRandom();
    
    protected final static AtomicLong connectionId = new AtomicLong(0);
    
    protected MysqlServer server;

    public DefaultChannelInitializer(MysqlServer server) {
        this.server = server;
    }
    
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        logger.info("Client connected from:{}", ch.remoteAddress());

        MysqlConnectionHandler connectionHandler = server.getServerHandler().onClientConnect(ch);
        
        ByteBuf greetingBuffer = ch.alloc().buffer();

        MysqlTranscoder transcoder = new MysqlTranscoder();

        transcoder.setContext(new TranscoderContext());

        transcoder.getContext().getCapabilities().setCapabilities(0x807ff7ff);
        transcoder.getContext().getServerStatus().setStatus(0x0002);

        InitialHandshakePacket greet = new InitialHandshakePacket();
        
        random.setSeed(System.currentTimeMillis());
        
        byte[] scramble = new byte[20];
        
        random.nextBytes(scramble);
        
        greet.setScramble(scramble);
        greet.setCharacterSet(MysqlConstants.CharsetConstants.UTF8_GENERAL_CI);
                        
        greet.setConnectionId(connectionId.incrementAndGet());
        greet.setServerName(getServer().getServerDesription());
        greet.setSequenceNumber(0);

        transcoder.encode(greet, greetingBuffer);

        ch.writeAndFlush(greetingBuffer);

        AuthPhaseServerHandler authPhaseServerHandler = new AuthPhaseServerHandler(transcoder);        
        authPhaseServerHandler.setServer(server);
        
        authPhaseServerHandler.setConnectionHandler(connectionHandler);
        
        ch.pipeline().addLast(authPhaseServerHandler);
           
        ch.closeFuture().addListener(new GenericFutureListener() {
            @Override
            public void operationComplete(Future future) throws Exception {                                
                server.getServerHandler().onClientDisconnect(ch);                
            }
        });
    }

    public MysqlServer getServer() {
        return server;
    }

    public void setServer(MysqlServer server) {
        this.server = server;
    }
}

DefaultChannelInitializer пока хранит кол-во подключений, f так же настраивает обработчик на закрытие канала.

ch.closeFuture().addListener(new GenericFutureListener() {
    @Override
    public void operationComplete(Future future) throws Exception {                                
        server.getServerHandler().onClientDisconnect(ch);                
    }
});

Кроме того, DefaultChannelInitializer добавляет обработчик AuthPhaseServerHandler, который является обработчик канала Netty на время фазы аутентификации.

После того, как клиент отправил пакет Login. AuthPhaseServerEncoder удаляет себя из channel pipeline и добавляет обработчик фазы команд, настраивая, текущую фазу как фазу команд.

public class AuthPhaseServerHandler extends ChannelInboundHandlerAdapter {

    public static final Logger logger = LoggerFactory.getLogger(AuthPhaseServerHandler.class);

    protected MysqlTranscoder transcoder = null;

    protected MysqlServer server;

    protected MysqlConnectionHandler connectionHandler;

    public AuthPhaseServerHandler(MysqlTranscoder transcoder) {
        this.transcoder = transcoder;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        logger.debug("channel read");

        ByteBuf byteBuf = (ByteBuf)msg;

        if (logger.isTraceEnabled()) {
            logger.trace("\n{}", ByteBufUtil.prettyHexDump(byteBuf));
        }

        OkPacket okPacket = new OkPacket();
        okPacket.setAffectedRows(0);
        okPacket.setSequenceNumber(2);

        ByteBuf buffer = ctx.alloc().buffer();

        try {
            transcoder.encode(okPacket, buffer);
        } catch (Exception e) {

        }

        logger.trace("\n{}", ByteBufUtil.prettyHexDump(buffer));

        ctx.writeAndFlush(buffer);

        ((ByteBuf) msg).release();

        ctx.pipeline().remove(this);

        transcoder.getContext().setCommandPhase();

        // inbound handlers
        ctx.pipeline().addFirst(new ByteToMysqlPacketDecoder(transcoder));
        DefaultServerHandler defaultServerHandler = new DefaultServerHandler(transcoder);
        defaultServerHandler.setServerHandler(getServer().getServerHandler());
        defaultServerHandler.setConnectionHandler(getConnectionHandler());

        ctx.pipeline().addLast(defaultServerHandler);

        // out bound handlers
        ctx.pipeline().addFirst(new MysqlPacketToByteEncoder(transcoder));
        ctx.pipeline().addLast(new ServerObjectToPacketEncoder(transcoder));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        logger.debug("Exception", cause);

        cause.printStackTrace();
        ctx.close();
    }

    public MysqlTranscoder getTranscoder() {
        return transcoder;
    }

    public void setTranscoder(MysqlTranscoder transcoder) {
        this.transcoder = transcoder;
    }

    public MysqlServer getServer() {
        return server;
    }

    public void setServer(MysqlServer server) {
        this.server = server;
    }

    public MysqlConnectionHandler getConnectionHandler() {
        return connectionHandler;
    }

    public void setConnectionHandler(MysqlConnectionHandler connectionHandler) {
        this.connectionHandler = connectionHandler;
    }
}

Необходимо реализовать и другие обработчики, которые добавляются в channel pipeline.

ByteToMysqlPacketDecoder - это декодер сообщений. Его задача преобразовывать байты из сети в соответствующие объекты.

public class ByteToMysqlPacketDecoder extends ByteToMessageDecoder {

    public static final Logger logger = LoggerFactory.getLogger(ByteToMysqlPacketDecoder.class);

    protected MysqlTranscoder transcoder = new MysqlTranscoder();

    public ByteToMysqlPacketDecoder() {
    }

    public ByteToMysqlPacketDecoder(MysqlTranscoder transcoder) {
        this.transcoder = transcoder;
    }

    @Override
    protected void decode(ChannelHandlerContext chc, ByteBuf bb, List<Object> list) throws Exception {
        Packet bp = transcoder.decode(bb);

        if (bp != null) {
            list.add(bp);
        }
    }
}

DefaultServerHandler - это обработчик уже пакетов MySQL, который делегирует управление соответствующему обработчику ConnectionHandler.

public class DefaultServerHandler extends ChannelInboundHandlerAdapter {

    public static final Logger logger = LoggerFactory.getLogger(DefaultServerHandler.class);

    protected MysqlTranscoder transcoder = null;
    
    protected MysqlServerHandler serverHandler;
    
    protected MysqlConnectionHandler connectionHandler;
    
    public DefaultServerHandler(MysqlTranscoder transcoder) {
        this.transcoder = transcoder;
    }    
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        logger.debug("channel read");

        logger.debug("Read msg {}", msg);
                
        if (msg instanceof ComQueryPacket) {
            try {
                ServerObject serverObject = connectionHandler.onQuery(((ComQueryPacket) msg).getQuery());
                       
                ctx.channel().writeAndFlush(serverObject);
            } catch (ServerObjectException e) {
                ctx.channel().writeAndFlush(e);
            }
        }
        
        if (msg instanceof ComInitDbPacket) {
            try {
                ServerObject serverObject = connectionHandler.initDb(((ComInitDbPacket) msg).getSchemaName());
                       
                ctx.channel().writeAndFlush(serverObject);
            } catch (ServerObjectException e) {
                ctx.channel().writeAndFlush(e);
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        logger.debug("Exception", cause);

        cause.printStackTrace();
        ctx.close();
    }

    public MysqlTranscoder getTranscoder() {
        return transcoder;
    }

    public void setTranscoder(MysqlTranscoder transcoder) {
        this.transcoder = transcoder;
    }

    public MysqlServerHandler getServerHandler() {
        return serverHandler;
    }

    public void setServerHandler(MysqlServerHandler serverHandler) {
        this.serverHandler = serverHandler;
    }

    public MysqlConnectionHandler getConnectionHandler() {
        return connectionHandler;
    }

    public void setConnectionHandler(MysqlConnectionHandler connectionHandler) {
        this.connectionHandler = connectionHandler;
    }
}

MysqlConnectionHandler - это интерфейс, который описывает обработчик команд.

public interface MysqlConnectionHandler {
    ServerObject onQuery(String query);
    
    ServerObject initDb(String schemaName);
}

MysqlPacketToByteEncoder - это класс кодировщика, который преобразовывает пакеты в байты.

public class MysqlPacketToByteEncoder extends MessageToByteEncoder<Packet> {

    public static final Logger logger = LoggerFactory.getLogger(MysqlPacketToByteEncoder.class);
    
    protected MysqlTranscoder transcoder = new MysqlTranscoder();

    public MysqlPacketToByteEncoder(MysqlTranscoder transcoder) {
        this.transcoder = transcoder;
    }
    
    public MysqlTranscoder getTranscoder() {
        return transcoder;
    }

    public void setTranscoder(MysqlTranscoder transcoder) {
        this.transcoder = transcoder;
    }
    
    @Override
    protected void encode(ChannelHandlerContext ctx, Packet msg, ByteBuf out) throws Exception {
        if (msg instanceof Packet) {
            transcoder.encode((Packet)msg, out);
        }
    }
}

ServerObject - это маркирующий интерфейс, которым будут отмечены классы, которые реализуют результат работы обработчиков.

ServerObjectException - это класс исключения, которые необходим для отправки пакетов ErrPacket от сервера клиенту.

public class ServerObjectException extends RuntimeException implements ServerObject {
    protected int errorCode;
    
    protected String errorMessage;

    protected String sqlState;
    
    public int getErrorCode() {
        return errorCode;
    }

    public void setErrorCode(int errorCode) {
        this.errorCode = errorCode;
    }

    public String getErrorMessage() {
        return errorMessage;
    }

    public void setErrorMessage(String errorMessage) {
        this.errorMessage = errorMessage;
    } 

    public String getSqlState() {
        return sqlState;
    }

    public void setSqlState(String sqlState) {
        this.sqlState = sqlState;
    }    
}

ErrPacket - информация об ошибке.

/**
 *
 * @see https://dev.mysql.com/doc/internals/en/packet-ERR_Packet.html
 */
public class ErrPacket extends BasePacket {
    protected long errorCode;
    protected String sqlState;
    protected String errorMessage;

    public long getErrorCode() {
        return errorCode;
    }

    public void setErrorCode(long errorCode) {
        this.errorCode = errorCode;
    }

    public String getSqlState() {
        return sqlState;
    }

    public void setSqlState(String sqlState) {
        this.sqlState = sqlState;
    }

    public String getErrorMessage() {
        return errorMessage;
    }

    public void setErrorMessage(String errorMessage) {
        this.errorMessage = errorMessage;
    }

    @Override
    public void readBody(ByteBuf bb, TranscoderContext context) throws Exception {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    @Override
    public void writeBody(ByteBuf bb, TranscoderContext context) throws Exception {
        MysqlByteBufUtil.writeInt1(bb, 0xff); // [ff] header of the ERR packet
        MysqlByteBufUtil.writeInt2(bb, getErrorCode()); // error_code 	error-code

        if (
            context != null
            && context.getCapabilities() != null
            && context.getCapabilities().isClientProtocol41()
        ) {
            MysqlByteBufUtil.writeString(bb, "#".getBytes());
            MysqlByteBufUtil.writeLengthedString(bb, getSqlState().getBytes(), 5);
        }

        MysqlByteBufUtil.writeStringEof(bb, getErrorMessage().getBytes());
    }

    @Override
    public int calculateBodyLength(TranscoderContext context) {
        int result =
                1       // header 	[ff] header of the ERR packet
                + 2;    //error_code 	error-code

        if (
            context != null
            && context.getCapabilities() != null
            && context.getCapabilities().isClientProtocol41()
        ) {
            result += 1; // sql_state_marker 	# marker of the SQL State
            result += 5; // sql_state 	SQL State
        }

        result += getErrorMessage().getBytes().length;

        return result;
    }
}

Тест:

public class ErrPacketTest {

    protected MysqlTranscoder transcoder;

    protected TranscoderContext transcoderContext;

    protected Capabilities capabilities;

    protected ServerStatus serverStatus;

    @Before
    public void before() {
        transcoder = new MysqlTranscoder();
        transcoderContext = new TranscoderContext();

        transcoderContext.setCommandPhase();

        transcoder.setContext(transcoderContext);

        capabilities = transcoderContext.getCapabilities();
        serverStatus = transcoderContext.getServerStatus();
    }


    @Test
    public void testErrPacket() throws Exception {
        ByteBuf buffer = Unpooled.buffer();

        ErrPacket errPacket = new ErrPacket();
        errPacket.setErrorCode(0x0448);
        errPacket.setSqlState("HY000");
        errPacket.setErrorMessage("No tables used");
        errPacket.setSequenceNumber(1);
        
        capabilities.setClientProtocol41();

        transcoder.encode(errPacket, buffer);

        assertEquals("17000001ff48042348593030304e6f207461626c65732075736564", ByteBufUtil.hexDump(buffer));
    }
}

EofPacket - это специальный пакет, который используется для завершения последовательности пакетов.

/**
 *
 * @see https://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html
 */
public class EofCommand extends BasePacket{

    protected long warnings;

    protected ServerStatus status = new ServerStatus();

    public ServerStatus getStatus() {
        return status;
    }

    public void setStatus(ServerStatus status) {
        this.status = status;
    }

    public long getWarnings() {
        return warnings;
    }

    public void setWarnings(long warnings) {
        this.warnings = warnings;
    }

    @Override
    public void readBody(ByteBuf bb, TranscoderContext context) throws Exception {
        setWarnings(MysqlByteBufUtil.readInt2(bb));
        getStatus().setStatus(MysqlByteBufUtil.readInt2(bb));
    }

    @Override
    public void writeBody(ByteBuf bb, TranscoderContext context) throws Exception {
        bb.writeByte(0xfe);

        MysqlByteBufUtil.writeInt2(bb, getWarnings());
        MysqlByteBufUtil.writeInt2(bb, context.getServerStatus().getStatus());
    }

    @Override
    public int calculateBodyLength(TranscoderContext context) {
        return
            1       // header
            + 2     // warnings
            + 2;    // status flags
    }
}

Тест

public class EofPacketTest {

    protected MysqlTranscoder transcoder;

    protected TranscoderContext transcoderContext;

    protected Capabilities capabilities;

    protected ServerStatus serverStatus;

    @Before
    public void before() {
        transcoder = new MysqlTranscoder();
        transcoderContext = new TranscoderContext();

        transcoderContext.setCommandPhase();

        transcoder.setContext(transcoderContext);

        capabilities = transcoderContext.getCapabilities();
        serverStatus = transcoderContext.getServerStatus();
    }
}

Цель №5: Создать сервер, который обрабатывает заранее заданную команду и возвращает результат.

Цель №6: Попробовать зайти на сервер с использованием mysql-cli

Цель №7: Попробовать интеграцию с php.

Цель №8: Создать простой клиент, который подключается к серверу Mysql.

Цель №9: Создать простой клиент, который проходит авторизацию в mysql.

Цель №10: Создать простой клиент, который отправляет запрос COM_QUERY и интерпретирует результаты.

Цель №11: Воспользоваться библиотекой jSQLParser для реализации простого SQL-сервера.

Clone this wiki locally