亚马逊AWS官方博客

利用 VPC Traffic Mirroring 进行数据库流量复制

一、前言

Amazon Relational Database Service(Amazon RDS)是一项托管式数据库服务,让用户能够在 AWS Cloud 云中更轻松地设置、操作和扩展关系数据库,可为用户提供一个经济有效、容量可调的符合行业标准的关系数据库,并承担常见的数据库管理任务,使您可以专注于应用程序和用户。

在日常使用数据库过程中, 我们经常会遇到这样的诉求:希望能够用线上业务的真实流量去模拟、压测测试环境,这样能够在业务上线、变更之前发现一些问题。本方案基于 RDS/Aurora MySQL 为流量模拟对象,总结了一些常见的方案,并且对流量复制这种对线上业务侵入性最小的方式展开了详细的介绍。

二、方案选型与对比

VPC 流量镜像

流量镜像是亚马逊 VPC 网络的一项功能,您可以使用它从弹性网络接口复制网络流量。然后,您可以将流量发送到其他监控设备,常用于:

  • 内容检查
  • 威胁监控
  • 故障排除

安全和监控设备可以部署为单个实例,也可以部署为网络负载平衡器或具有 UDP 侦听器的网关负载平衡器后面的实例组。流量镜像支持过滤和数据包截断,因此您可以使用自己选择的监控工具仅提取感兴趣的流量。

三、方案架构设计

下图是 VPC 流量复制方案实施的架构图,借助蓝绿部署的方式,把线上的数据同步复制到测试环境,同时使用 VPC 流量镜像功能来捕获源端网卡的流量,把其流量复制到了一台 EC2 服务器,然后开发程序对该 EC2 的网卡进行监听并解析,从而得到原始 SQL,进而按照期望的诉求投递到目标端数据库实例中。

四、流程描述

接下来我们看一下通过流量复制进行性能压测需要哪些基本步骤:

  1. [控制台配置] 配置 VPC Traffic Mirroring,源端为数据库网卡,目标端为一台 EC2 实例的网卡
  2. [EC2 配置] 反解 vxlan 路由镜像的数据包(原始镜像过来的流量加了 vxlan header)
  3. [开发工作] 监听目标网卡数据,并根据 MySQL 协议解析数据包内容
  4. [开发工作] 提取 query 字段信息,进一步筛选 COM_QUERY (mysql.command==3) 得出 SQL
  5. [开发工作] 把 SQL 按照规则发送到绿集群执行(忽略变更、回放倍数、错误计数等)
  6. [控制台观察] Cloudwatch 对比观察结果

五、详细步骤

1. VPC 流量镜像配置

首先,我们需要通过 VPC 的流量镜像功能,配置一个由源端(RDS 实例所在的网卡)到目标端(用于解析流量的 EC2)的会话,具体操作步骤如下:

  • 登录 AWS 控制台 – VPC – 流量镜像
  • 如下图创建 Mirror Target,关联到 EC2 的网卡(可以从 EC2 控制台的 “联网” TAB 查看其网络接口对应的网卡接口 ID)
  • 如下图创建 Mirror Filters,过滤对应端口的 TCP 流量
  • 创建 Mirror Sessions,选择镜像源为 RDS 的网卡 ID,目标为之前创建的流量镜像目标,以及选择刚创建的筛选条件。对于 RDS 可以使用 nslookup 获得 endpoint 的 IP 地址,然后从 EC2 控制台的“Network Interfaces”页面根据 IP 地址找到对应的网卡 ID
  • 镜像流量配置完成

2. 镜像流量解析

在完成 VPC 流量镜像解析后,我们接下来需要在 EC2 端把这些数据包进行处理,便可以得出原始的三层流量数据包,具体操作步骤如下:

  • 镜像流量通过 UDP 投递到目标 4789 端口,所以目标 EC2 需要在安全组允许该入口流量策略,如下图:
  • 由于镜像流量在原始流量上加了 xvlan 的 header 信息,我们可以创建一个 vxlan 的虚拟设备,反解析出原始的数据包
    ### 创建一个名为 capture0 的网络 interface
    ### 把 eth0 的二层 vxlan 报文根据 VNI 去掉 vxlan 的头部发送到该 inerface 
    ### 其中 vxlan id 换成 "Mirror sessions" 中的 VNI 值
    sudo ip link add capture0 type vxlan id 11730587 dev eth0 dstport 4789
    sudo ip link set capture0 up
  • 使用 tcpdump 判断流量转发链路是否已经正常:sudo tcpdump -vv -i capture0

3. MySQL 协议解析

根据前面的步骤得到原始数据包之后,我们便可以监听 capture0 网卡,根据对应的协议来解析出需要的内容,下面的例子是针对 MySQL 协议作为解析对象。同样我们可以使用类似的思路,换成其他数据库协议来对不同的数据库流量进行解析。

MySQL 数据库连接的生命周期分为连接阶段和命令交互阶段,我们本篇就不对连接阶段做过多赘述,更关注于流量复制需要解析的命令阶段。

3.1 MySQL 数据包格式

数据包被 MySQL 协议封装后格式如下,我们需要提取出 payload 中的内容

  • payload_length:标明 payload 长度
  • sequence_id:包的序列号
  • payload:实际数据内容

3.2 命令类型过滤

接下来我们需要从 payload 进行进一步筛选出请求 SQL 语句,对于仅仅提取业务 SQL 而言,需要拿出 COM_QUERY 类型的内容

3.3 返回结果数据包

如果需要对结果集进行解析比对,则需要请求返回包进行解析,可参考:https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_basic_ok_packet.html

4. 对解析出来的流量分析并重放

一些开源的解析方案

有一些开源的流量监控并解析 MySQL 协议的工具,可以帮助我们节省这一步的开发工作量直接获取到原始的 SQL 语句,比如:

在得到原始 SQL 之后,我们便可以按照预想的形式进行过滤(比如只重放查询语句)、增加 QPS 倍数等自定义操作,把这些 SQL 语句发送到目标集群,在进行重放的过程中,需要注意以下几点:

  • 重放流量需要考虑投递性能(建议使用异步),否则容易在重放程序端出现瓶颈
  • 对于包含变更语句的流量重放,不建议开启倍数,否则数据和源端可能数据不一致,以及一致性
  • 对于需要对结果集进行比对的场景,建议增加一层消息队列,异步进行一致性对比

六、限制

使用本方案需要考虑如下的限制因素:

七、结束语

本博客为大家展示了如何通过 VPC 流量镜像克隆 MySQL 请求流量到另外一套环境。在实际使用中我们可以通过该思路衍生出更多的场景功能,比如历史活动流量保存、跨环境 SQL 验证、多倍数流量压测、热点数据捕获等等,希望对大家有所帮助。

附录 1:基于 tshark 的流量复制压测方案示例代码

package main

// 注意: 本方案为 poc 测试代码, 作为方案思路讲解, 线上使用前请充分验证 

import (
    "bufio"
    "database/sql"
    "flag"
    "fmt"
    "io"
    "log"
    "os"
    "os/exec"
    "regexp"

    _ "github.com/go-sql-driver/mysql"
)

var (
    db         *sql.DB
    flgHelp    bool
    printsql   int
    selectonly int
    host       string
    port       int
    user       string
    password   string
    database   string
    ratio      int
    totalNum   int
    errorNum   int
    skipNum    int
)

func main() {

    // 参数解析
    parseCmdLineFlags()
    if flgHelp {
        flag.Usage()
        os.Exit(0)
    }
    flag.Args()
    fmt.Println("\nRunning with Arguments: ")
    fmt.Printf(" * selectonly: %d\n", selectonly)
    fmt.Printf(" * printsql: %d\n", printsql)
    fmt.Printf(" * ratio: %d\n", ratio)
    fmt.Printf(" * host: %s\n", host)
    fmt.Printf(" * port: %d\n", port)
    fmt.Printf(" * user: %s\n\n", user)

    // 初始化数据库
    err := InitDB()
    //defer db.Close()
    if err != nil {
        errMsg := fmt.Sprintf("Failed to connect to database: %s:%s@tcp(%s:%d)/%s?charset=utf8mb4", user, password, host, port, database)
        log.Fatal(errMsg)
        log.Fatal("Error: %v\n", err)
        return
    }
    ExecCommand()
    //RunCommand()
}

// 参数解析
func parseCmdLineFlags() {
    flag.BoolVar(&flgHelp, "help", false, "Show help, this tool is only for Aurora MySQL/RDS MySQL test")
    flag.IntVar(&printsql, "printsql", 0, "Set 1 to print all sqls")
    flag.IntVar(&selectonly, "selectonly", 1, "Set 0 to apply all type sqls including select/insert/update/delete")
    flag.StringVar(&host, "h", "127.0.0.1", "Target mysql host addr")
    flag.IntVar(&port, "P", 3306, "Target mysql port")
    flag.StringVar(&user, "u", "root", "Target mysql user")
    flag.StringVar(&password, "p", "", "Target mysql password")
    flag.StringVar(&database, "db", "", "Target mysql database")
    flag.IntVar(&ratio, "ratio", 1, "Ratio for target replay, default is 1")
    flag.Parse()
}

// 初始化数据库
func InitDB() (err error) {
    log.Println("Init database connection")
    dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4", user, password, host, port, database)
    db, err = sql.Open("mysql", dsn)
    if err != nil {
        return err
    }
    db.SetMaxOpenConns(128)
    db.SetMaxIdleConns(32)

    // 尝试与数据库建立连接
    err = db.Ping()
    if err != nil {
        return err
    } else {
        log.Println("Database is connected successfully")
    }
    return nil
}

// 执行 SQL 语句
func RunSQL(sqlStr string) (err error) {
    if printsql == 1 {
        log.Println("[ Run SQL ] ", sqlStr)
    }
    if totalNum%10000 == 0 {
        log.Println("[ Execute Summary ] ", totalNum, "sqls in total,", errorNum, "failed,", skipNum, "skipped")
    }
    _, err = db.Exec(sqlStr)
    if err != nil {
        errorNum += 1
        log.Println("Query Error, SQL:", err)
        log.Println("Query Error, Msg:", err)
        return
    }
    //defer db.Close()
    return
}

// 输出内容
func ExecCommand() {
    var contentArray = make([]string, 0, 5)
    contentArray = contentArray[0:0]
    cmd := exec.Command("sudo", "tshark", "-i", "capture0", "-T", "fields", "-e", "mysql.query", "-Y", "mysql.command==3", "-l")
    //cmd := exec.Command("sudo", "/home/ec2-user/demo-1/mysql-sniffer", "-i", "capture0")
    //显示运行的命令
    log.Println("Run command: ", cmd)
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        fmt.Fprintln(os.Stderr, "error=>", err.Error())
        return
    }
    cmd.Start() // Start开始执行包含的命令,但并不会等待该命令完成即返回。Wait方法会返回命令的返回状态码并在命令返回后释放相关的资源。

    reader := bufio.NewReader(stdout)

    var index int
    //实时循环读取输出流中的一行内容
    for {
        line, err2 := reader.ReadString('\n')
        if err2 != nil || io.EOF == err2 {
            break
        }
        //fmt.Println(line)
        GetAndFilterOutput(line)
        index++
        contentArray = append(contentArray, line)
    }

    cmd.Wait()
    return
}

// 过滤内容
func GetAndFilterOutput(output string) {

    sqlStr := output
    /*
        // 适用于 mysql-sniffer 的输出结果分割
        fmt.Print("\n==> : ", output, "\n")
        r := regexp.MustCompile("[^\\s]+")
        sqlStr := r.FindAllString(output, -1)[7]
        fmt.Print("\n==> : ", sqlStr, "\n")
    */

    // 对内容进行过滤
    if sqlStr == "\n" {
        return
    }
    /*
        TODO: SQL split 当前仅适配 sysbench 语句, 对于复杂业务语句目前需要做适配性改造
    */

    pattern := "(?i)^SELECT|^INSERT|^UPDATE|^DELETE"
    if selectonly == 1 {
        pattern = "(?i)^SELECT"
    }
    matched, _ := regexp.MatchString(pattern, sqlStr)

    totalNum += 1
    if matched {
        //fmt.Print("Run: ", sqlStr)
        for i := 0; i < ratio; i++ {
            go RunSQL(sqlStr)
        }
    } else {
        skipNum += 1
        //fmt.Print("SKIP: ", sqlStr)
    }

    return
}

使用方法以及效果展示

附录 2:参考资料

本篇作者

陈阳

AWS 数据库专家架构师,十余年数据库行业经验,主要负责基于亚马逊云计算数据库产品的解决方案与架构设计工作。在加入 AWS 之前曾在哔哩哔哩、IBM、Verizon 等企业担任 DBA,在海量数据架构设计、自动化运维、稳定性保障等方面有丰富的经验。