scapy抓包使用

2021/11/19 6:39:54

本文主要是介绍scapy抓包使用,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

# coding=utf-8
import json
import time
import os
import dpkt
import socket
import datetime
import uuid
import traceback

from dpkt.ethernet import Ethernet
from scapy.layers.l2 import Ether
from scapy.sendrecv import sniff
from scapy.utils import wrpcap

from BigData.data_common.utils.file_util import FileUtil


def get_local_ip():
    hostname = socket.gethostname()
    # 获取本机内网ip
    local_ips = socket.gethostbyname_ex(hostname)[-1]
    return local_ips

def body_transfer(body):
    str_body = body.decode()
    body_ls = str_body.split("&")
    d = {}
    for item in body_ls:
        key_, value_ = item.split("=")
        d[key_.strip()] = value_.strip()


def analysis_pcap(timestamp, buf):
    data = {}
    if isinstance(buf, dpkt.ip.IP):
        eth = buf
    else:
        eth = dpkt.ethernet.Ethernet(buf)
    # print(eth.data.__dict__)
    # print("ip layer:"+eth.data.__class__.__name__) #以太包的数据既是网络层包
    # print("tcp layer:"+eth.data.data.__class__.__name__) #网络层包的数据既是传输层包
    # print("http layer:" + eth.data.data.data.__class__.__name__) #传输层包的数据既是应用层包
    # print('Timestamp: ',str(datetime.datetime.utcfromtimestamp(timestamp))) #打印出包的抓取时间
    if isinstance(eth.data, dpkt.ip.IP) or isinstance(eth.data, dpkt.ip6.IP6):
        #     # print('%d Non IP Packet type not supported %s' % (int(timestamp), eth.data.__class__.__name__))
        #     print('ip.data type is {}'.format(eth.data.__class__.__name__))
        #     print(repr(eth.data))
        #     return data
        ip = eth.data
        if isinstance(eth.data, dpkt.ip.IP):
            src_ip = socket.inet_ntoa(ip.src)
            dst_ip = socket.inet_ntoa(ip.dst)
            # do_not_fragment =bool(ip.off & dpkt.ip.IP_DF)
            # more_fragments =bool(ip.off & dpkt.ip.IP_MF)
            # fragment_offset = ip.off & dpkt.ip.IP_OFFMASK
            # key = 'IPV4' if isinstance(eth.data, dpkt.ip.IP) else 'IPV6'
            data.update({
                'time': timestamp,
                'IPV4': {'src': src_ip, 'dst': dst_ip}
            })
        else:
            src_ip = ip.src
            dst_ip = ip.dst
            data.update({
                'time': timestamp,
                'IPV6': {'src': src_ip, 'dst': dst_ip}
            })
        print('ip.data type is {}'.format(ip.data.__class__.__name__))
        if isinstance(ip.data, dpkt.tcp.TCP):
            layer = ip.data
            data.update(analysis_tcp(layer))
        elif isinstance(ip.data, dpkt.udp.UDP):
            layer = ip.data
            data.update(analysis_udp(layer))
        elif isinstance(ip.data, dpkt.icmp.ICMP) or isinstance(ip.data, dpkt.icmp6.ICMP6):
            layer = ip.data
            data.update(analysis_icmp(layer))
        else:
            print('analysis_pcap ip.data {}'.format(repr(ip.data)))
            data = {'time': timestamp, eth.data.__class__.__name__: {}}
    else:
        print('analysis_pcap eth.data {}'.format(repr(eth.data)))
        data = {'time': timestamp, eth.data.__class__.__name__: eth.data.__dict__}
    return data

def analysis_udp(udp, key="UDP"):
    try:
        data_dict = {}
        try:
            data_str = udp.data.decode('utf-8')
            if data_str.startswith('M-SEARCH'):
                data_list = data_str.strip().split('\n')[1:]
                for item in data_list:
                    k, v = item.split(':')[0], ':'.join(item.split(':')[1:])
                    data_dict[k.strip()] = v.strip()
            else:
                data_dict = data_str
        except:
            pass
        return {
            key: {
                'sport': udp.sport,
                'dport': udp.dport,
                'ulen': udp.ulen,
                'sum': udp.sum,
                'data': data_dict
            }
        }
    except:
        pass
    print('analysis_udp udp {}'.format(repr(udp)))
    return {}

def analysis_tcp(tcp, key='TCP'):
    data = {
        key: {
            'dport': tcp.dport,
            'sport': tcp.sport,
            'ack': tcp.ack,
            'seq': tcp.seq
        }
    }
    try:
        request = dpkt.http.Request(tcp.data)
        data['HTTP'] = {
                'type': 'request',
                'uri': request.uri,
                'Method': request.method.upper(),
                'Headers': dict(request.headers),
                'Body': body_transfer(request.body),
                'Data': body_transfer(request.data)
            }
    except:
        pass
    try:
        response = dpkt.http.Response(tcp)
        data['HTTP'] = {
                'type': 'response',
                'Headers': dict(response.headers),
                'Body': body_transfer(response.body),
                'Data': body_transfer(response.data)
            }
    except:
        pass
    try:
        data_dict = {}
        data_str = tcp.data.decode('utf-8')
        if data_str.startswith('M-SEARCH'):
            data_list = data_str.strip().split('\n')[1:]
            for item in data_list:
                k, v = item.split(':')[0], ':'.join(item.split(':')[1:])
                data_dict[k.strip()] = v.strip()
        else:
            data_dict = data_str
        if key in data and data_dict:
            data[key]['Data'] = data_dict
    except:
        pass
    if data:
        return data
    print('analysis_tcp tcp {}'.format(repr(tcp)))
    return {}

def analysis_icmp(icmp, key='ICMP'):
    try:
        if isinstance(icmp, dpkt.icmp.ICMP):
            return {
                'ICMP': {
                    'type': icmp.type,
                    'sum': icmp.sum,
                    'Data': analysis_pcap(int(time.time()), icmp.data.data)
                }
            }
        else:
            data_str = ''
            try:
                data_str = icmp.data.decode('utf-8')
            except:
                pass
            return {
                'ICMP6': {
                    'type': icmp.type,
                    'sum': icmp.sum,
                    'Data': data_str
                }
            }
    except:
        pass
    return {}


def analysis_pcap2(timestamp, buf):
    e = Ether(buf)
    e.show()

def get_dpkt():
    # 这里是针对单网卡的机子, 多网卡的可以在参数中指定网卡, 例:iface=Qualcomm QCA9377 802.11ac Wireless Adapter
    dpkt_ = sniff(count = 10)
    _uuid = uuid.uuid1()
    filename = f"{_uuid}.pcap"
    wrpcap(filename, dpkt_)
    return filename


def main():
    while True:
        filename = get_dpkt()
        with open(filename, "rb") as f:
            pcap = dpkt.pcap.Reader(f)
            local_ips = get_local_ip()
            for timestamp, buf in pcap:
                res = analysis_pcap(timestamp, buf)
                # analysis_pcap2(timestamp, buf)
                print(res)
                # FileUtil.write_lines('test.txt', json.dumps(res))

        os.remove(filename)
        # FileUtil.flush()

if __name__ =='__main__':
    main()

 



这篇关于scapy抓包使用的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程