nJcx's Blog

十年生死两茫茫,写程序,到天亮。相顾无言,惟有泪千行

让PacketBeat 插上DPDK的翅膀


介绍

Packetbeat 是一款开源的网络数据包分析工具,由Elastic公司开发并维护。它能够实时捕获和分析网络中的传输数据,主要用于监控和分析应用层协议的数据传输情况,如HTTP、MySQL、PostgreSQL等。通过这种方式,Packetbeat可以帮助用户了解网络性能、追踪应用程序错误以及监控安全事件。

Packetbeat的工作原理

  • 数据抓取:Packetbeat使用libpcap库(在Linux和macOS上)或WinPcap/Npcap(在Windows上)来抓取网络接口上的数据包。
  • 协议解码:一旦抓取到数据包,Packetbeat会根据配置解析不同的协议,提取出相关的请求和响应信息。
  • 数据发送:处理完的数据会被发送到指定的输出位置,通常是Elasticsearch,也可以是Logstash或者其他兼容的系统。

Packetbeat非常适合用于需要对网络流量进行深度分析的环境,例如企业内部网、云服务提供商、以及任何希望提升其应用性能和安全性的地方。结合Elastic Stack的其他组件(如Kibana),用户可以创建强大的可视化面板,以便更好地理解和管理他们的网络数据。早期的Packetbeat 支持Pf-ring,后来就不再支持了。为Packetbeat添加DPDK(Data Plane Development Kit)支持,可以显著提高其网络数据包处理能力,特别是对于高吞吐量和低延迟要求的场景。DPDK是一个优化了的数据包处理框架,它绕过了操作系统内核协议栈,直接在用户空间处理网络数据包,从而实现了高效的数据传输。Packetbeat依赖gopacket, 建议先读这个文章,作为前置知识, 当让gopacket 支持DPDK, 约等于让Packetbeat支持DPDK 。 https://www.njcx.bid/posts/T4.html

DPDK的安装

CentOS
#  yum install -y libpcap-devel gcc gcc-c++ make meson ninja  numactl-devel  numactl  net-tools pciutils
#  yum install -y kernel-devel-$(uname -r) kernel-headers-$(uname -r)

Debian + Ubuntu
# apt install -y libpcap-dev gcc g++ make meson ninja-build libnuma-dev numactl net-tools pciutils
# apt install -y linux-headers-$(uname -r)


#  wget http://fast.dpdk.org/rel/dpdk-20.11.10.tar.xz
#  tar -Jxvf dpdk-20.11.10.tar.xz
#  cd dpdk-stable-20.11.10 && meson build && cd build && ninja && ninja install
#  export LD_LIBRARY_PATH=/usr/local/lib64:$LD_LIBRARY_PATH
#  git clone git://dpdk.org/dpdk-kmods && cd  dpdk-kmods/linux/igb_uio
#  make
#  modprobe uio  &&  insmod igb_uio.ko
#  dpdk-devbind.py --status
#  ifconfig ens38 down   ## 填写实际网卡
#  dpdk-devbind.py -b igb_uio 0000:03:00.0(pci-addr)  ## 根据实际填写

#  echo "vm.nr_hugepages=1024" | tee -a /etc/sysctl.conf
#  sysctl -p

Packetbeat 拆解

Packetbeat 7 的函数调用链:

主程序入口 (main.go)
  ↓
命令行初始化 (cmd/root.go) 
    
Beater初始化与运行 (beater/packetbeat.go)
  ↓
协议注册和管理 (protos/registry.go)
   
网络协议处理 (TCP/UDP) (protos/tcp/tcp.go, protos/udp/udp.go)
  ↓
应用协议解析 (protos/{http,mysql,redis等})
  ↓
事件发布 (publish/publish.go)

Beater组件 (packetbeat.go):

  • 负责整体生命周期管理
  • 配置加载和验证
  • 网络接口初始化
  • 协议处理器注册
type packetbeat struct {
  config *conf.C 
  factory *processorFactory
  overwritePipelines bool
  done chan struct{}
  stopOnce sync.Once
}

协议注册中心 (registry.go):

  • 管理所有注册的协议处理器
  • 提供协议查找和插件管理功能
  • 定义协议插件接口规范
// 协议插件接口
type Plugin interface {
  GetPorts() []int
}

type TCPPlugin interface {
  Plugin
  Parse(pkt *Packet, ...) ProtocolData
  ReceivedFin(...) ProtocolData  
  GapInStream(...) (ProtocolData, bool)
}

type UDPPlugin interface {
  Plugin
  ParseUDP(pkt *Packet)
}

数据处理流程

  • 网卡/pcap读取原始数据包
  • TCP/UDP层识别和处理
  • 根据端口映射确定应用协议
  • 调用对应协议插件进行解析
// 判断数据采集方式

func (s *Sniffer) open() (snifferHandle, error) {
    if s.config.File != "" {
        return newFileHandler(s.config.File, s.config.TopSpeed, s.config.Loop)
    }

    switch s.config.Type {
    case "pcap":
        return openPcap(s.filter, &s.config)
    case "af_packet":
        return openAFPacket(s.filter, &s.config)
    case "dpdk":
        return openDpdk(s.filter)
    default:
        return nil, fmt.Errorf("Unknown sniffer type: %s", s.config.Type)
    }
}
func (stream *TCPStream) addPacket(pkt *protos.Packet, tcphdr *layers.TCP) {
  // 1. 获取协议处理器
  mod := conn.tcp.protocols.GetTCP(conn.protocol)

  // 2. 调用协议解析
  if len(pkt.Payload) > 0 {
    conn.data = mod.Parse(pkt, &conn.tcptuple, stream.dir, conn.data)
  }

  // 3. 处理FIN标记
  if tcphdr.FIN {
    conn.data = mod.ReceivedFin(&conn.tcptuple, stream.dir, conn.data) 
  }
}

事件发布写出

func (p *TransactionPublisher) worker(ch chan beat.Event, client beat.Client) {
  for {
    select {
    case event := <-ch:
      // 处理和发布事件
      pub, _ := p.processor.Run(&event)
      if pub != nil {
        client.Publish(*pub)
      }
    }
  }
}

让Packetbeat 支持DPDK

DPDK(Data Plane Development Kit)初始化过程涉及多个步骤,主要是为了准备环境以便高效处理网络数据包。以下是DPDK初始化过程中的一些关键步骤和内容:

  • EAL(Environment Abstraction Layer)初始化:这是DPDK初始化的核心部分。EAL提供了对底层硬件资源的抽象,包括内存管理、CPU核心分配、设备识别与初始化等。通过调用rte_eal_init()函数来完成,此过程会解析命令行参数,并设置好运行时环境。
  • 内存分配:DPDK需要大量的连续物理内存来存储数据包和执行高效的数据传输。在EAL初始化阶段,会预留一块大页内存(hugepage),用于后续的数据包处理操作。

特别要注意的是:EAL 初始化和 DPDK端口初始化,要在主线程初始化。不然会报错如下:

EAL: Detected 40 lcore(s)
EAL: Detected 2 NUMA nodes
EAL: Error creating '/var/run/dpdk': Operation not permitted
EAL: Cannot create runtime directory
EAL: FATAL: Invalid 'command line' arguments.
EAL: Invalid 'command line' arguments.

由于 Packetbeat 使用了 Cobra, 并且初始化的操作都是在 Cobra的子命令里面完成的, Cobra 的子命令都是非主线程, 如果把DPDK的初始化放到package sniffer 里面, 则会报错如上。所以修改main.go, 把DPDK 的初始化放到 Cobra 的子命令初始化之前.

package main

import (
    "github.com/njcx/packetbeat7_dpdk/cmd"
    "github.com/njcx/packetbeat7_dpdk/dpdkinit"
    "os"
)

var Name = "packetbeat"

func main() {
    dpdkinit.DpdkInit()
    if err := cmd.RootCmd.Execute(); err != nil {
        os.Exit(1)
    }
}

由于,我们把DPDK 的初始化放到 Cobra 的子命令初始化之前, 导致我们无法读取 Cobra的命令, 也无法利用后续的配置文件初始化. 所以, 我们可以在里面添加一些命令行参数, 用于DPDK 的初始化, 不能用spf13/pflag 和 标准库flag, 会和后续初始化产生冲突. 简单实现一个命令行参数提取代码 :

package dpdkinit

import (
    "fmt"
    "os"
    "strings"
)

func Parse(key string) (string, error) {
    args := os.Args[1:]
    for i := 0; i < len(args); i++ {
        arg := args[i]
        if strings.HasPrefix(arg, "--") {
            name := strings.TrimPrefix(arg, "--")
            if i+1 >= len(args) {
                return "", fmt.Errorf("no value provided for flag: %s", name)
            }
            return args[i+1], nil
        }

        if strings.HasPrefix(arg, "-") {
            shorthand := strings.TrimPrefix(arg, "-")
            if i+1 >= len(args) {
                return "", fmt.Errorf("no value provided for flag: %s", shorthand)
            }
            return args[i+1], nil
        }
    }

    return "", fmt.Errorf("cant find this key: %s", key)
}

DPDK 初始化:

var DPdk *dpdk.DPDKHandle

func DpdkInit() error {

    dpdkPort, _ := Parse("dpdk_port")
    dpdkStatus, _ := Parse("dpdk_status")
    if dpdkStatus == "enable" {
        err := dpdk.InitDPDK([]string{})
        if err != nil {
            return err
        }
        num16, _ := strconv.ParseUint(dpdkPort, 10, 16)
        port := uint16(num16)

        h, err := dpdk.NewDPDKHandle(port)
        if err != nil {
            h.Close()
            return err
        }
        DPdk = h
    }
    return nil
}

DPDKHandle 已经实现了 snifferHandle的接口,直接赋值:

func openDpdk(filter string) (snifferHandle, error) {

    h := dpdkinit.DPdk
    err := h.SetBPFFilter(filter)
    if err != nil {
        h.Close()
        return nil, err
    }
    return h, nil
}

构建:

CGO_CFLAGS="-msse4.2 -fno-strict-aliasing " CGO_LDFLAGS=" -lrte_eal -lrte_mbuf -lrte_mempool -lrte_ethdev -lpcap" go build

执行:

./packetbeat8_dpdk --dpdk_status enable --dpdk_port 0 -c ~/go/packetbeat.dpdk.yml 

在配置文件里面标记输入源, 用于后续初始化:

packetbeat.interfaces.device: 0
packetbeat.interfaces.snaplen: 1514
packetbeat.interfaces.type: dpdk
packetbeat.interfaces.buffer_size_mb: 100

运行正常,代码repo 如下, 全文完。

https://github.com/njcx/packetbeat8_dpdk
https://github.com/njcx/packetbeat7_dpdk
https://github.com/njcx/packetbeat6_dpdk

dpdk