scapy抓包使用
2021/11/19 6:39:54
本文主要是介绍scapy抓包使用,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
# coding=utf-8 import json import time import os import dpkt import socket import datetime import uuid import traceback from dpkt.ethernet import Ethernet from scapy.layers.l2 import Ether from scapy.sendrecv import sniff from scapy.utils import wrpcap from BigData.data_common.utils.file_util import FileUtil def get_local_ip(): hostname = socket.gethostname() # 获取本机内网ip local_ips = socket.gethostbyname_ex(hostname)[-1] return local_ips def body_transfer(body): str_body = body.decode() body_ls = str_body.split("&") d = {} for item in body_ls: key_, value_ = item.split("=") d[key_.strip()] = value_.strip() def analysis_pcap(timestamp, buf): data = {} if isinstance(buf, dpkt.ip.IP): eth = buf else: eth = dpkt.ethernet.Ethernet(buf) # print(eth.data.__dict__) # print("ip layer:"+eth.data.__class__.__name__) #以太包的数据既是网络层包 # print("tcp layer:"+eth.data.data.__class__.__name__) #网络层包的数据既是传输层包 # print("http layer:" + eth.data.data.data.__class__.__name__) #传输层包的数据既是应用层包 # print('Timestamp: ',str(datetime.datetime.utcfromtimestamp(timestamp))) #打印出包的抓取时间 if isinstance(eth.data, dpkt.ip.IP) or isinstance(eth.data, dpkt.ip6.IP6): # # print('%d Non IP Packet type not supported %s' % (int(timestamp), eth.data.__class__.__name__)) # print('ip.data type is {}'.format(eth.data.__class__.__name__)) # print(repr(eth.data)) # return data ip = eth.data if isinstance(eth.data, dpkt.ip.IP): src_ip = socket.inet_ntoa(ip.src) dst_ip = socket.inet_ntoa(ip.dst) # do_not_fragment =bool(ip.off & dpkt.ip.IP_DF) # more_fragments =bool(ip.off & dpkt.ip.IP_MF) # fragment_offset = ip.off & dpkt.ip.IP_OFFMASK # key = 'IPV4' if isinstance(eth.data, dpkt.ip.IP) else 'IPV6' data.update({ 'time': timestamp, 'IPV4': {'src': src_ip, 'dst': dst_ip} }) else: src_ip = ip.src dst_ip = ip.dst data.update({ 'time': timestamp, 'IPV6': {'src': src_ip, 'dst': dst_ip} }) print('ip.data type is {}'.format(ip.data.__class__.__name__)) if isinstance(ip.data, dpkt.tcp.TCP): layer = ip.data data.update(analysis_tcp(layer)) elif isinstance(ip.data, dpkt.udp.UDP): layer = ip.data data.update(analysis_udp(layer)) elif isinstance(ip.data, dpkt.icmp.ICMP) or isinstance(ip.data, dpkt.icmp6.ICMP6): layer = ip.data data.update(analysis_icmp(layer)) else: print('analysis_pcap ip.data {}'.format(repr(ip.data))) data = {'time': timestamp, eth.data.__class__.__name__: {}} else: print('analysis_pcap eth.data {}'.format(repr(eth.data))) data = {'time': timestamp, eth.data.__class__.__name__: eth.data.__dict__} return data def analysis_udp(udp, key="UDP"): try: data_dict = {} try: data_str = udp.data.decode('utf-8') if data_str.startswith('M-SEARCH'): data_list = data_str.strip().split('\n')[1:] for item in data_list: k, v = item.split(':')[0], ':'.join(item.split(':')[1:]) data_dict[k.strip()] = v.strip() else: data_dict = data_str except: pass return { key: { 'sport': udp.sport, 'dport': udp.dport, 'ulen': udp.ulen, 'sum': udp.sum, 'data': data_dict } } except: pass print('analysis_udp udp {}'.format(repr(udp))) return {} def analysis_tcp(tcp, key='TCP'): data = { key: { 'dport': tcp.dport, 'sport': tcp.sport, 'ack': tcp.ack, 'seq': tcp.seq } } try: request = dpkt.http.Request(tcp.data) data['HTTP'] = { 'type': 'request', 'uri': request.uri, 'Method': request.method.upper(), 'Headers': dict(request.headers), 'Body': body_transfer(request.body), 'Data': body_transfer(request.data) } except: pass try: response = dpkt.http.Response(tcp) data['HTTP'] = { 'type': 'response', 'Headers': dict(response.headers), 'Body': body_transfer(response.body), 'Data': body_transfer(response.data) } except: pass try: data_dict = {} data_str = tcp.data.decode('utf-8') if data_str.startswith('M-SEARCH'): data_list = data_str.strip().split('\n')[1:] for item in data_list: k, v = item.split(':')[0], ':'.join(item.split(':')[1:]) data_dict[k.strip()] = v.strip() else: data_dict = data_str if key in data and data_dict: data[key]['Data'] = data_dict except: pass if data: return data print('analysis_tcp tcp {}'.format(repr(tcp))) return {} def analysis_icmp(icmp, key='ICMP'): try: if isinstance(icmp, dpkt.icmp.ICMP): return { 'ICMP': { 'type': icmp.type, 'sum': icmp.sum, 'Data': analysis_pcap(int(time.time()), icmp.data.data) } } else: data_str = '' try: data_str = icmp.data.decode('utf-8') except: pass return { 'ICMP6': { 'type': icmp.type, 'sum': icmp.sum, 'Data': data_str } } except: pass return {} def analysis_pcap2(timestamp, buf): e = Ether(buf) e.show() def get_dpkt(): # 这里是针对单网卡的机子, 多网卡的可以在参数中指定网卡, 例:iface=Qualcomm QCA9377 802.11ac Wireless Adapter dpkt_ = sniff(count = 10) _uuid = uuid.uuid1() filename = f"{_uuid}.pcap" wrpcap(filename, dpkt_) return filename def main(): while True: filename = get_dpkt() with open(filename, "rb") as f: pcap = dpkt.pcap.Reader(f) local_ips = get_local_ip() for timestamp, buf in pcap: res = analysis_pcap(timestamp, buf) # analysis_pcap2(timestamp, buf) print(res) # FileUtil.write_lines('test.txt', json.dumps(res)) os.remove(filename) # FileUtil.flush() if __name__ =='__main__': main()
这篇关于scapy抓包使用的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-15PingCAP 黄东旭参与 CCF 秀湖会议,共探开源教育未来
- 2024-05-13PingCAP 戴涛:构建面向未来的金融核心系统
- 2024-05-09flutter3.x_macos桌面os实战
- 2024-05-09Rust中的并发性:Sync 和 Send Traits
- 2024-05-08使用Ollama和OpenWebUI在CPU上玩转Meta Llama3-8B
- 2024-05-08完工标准(DoD)与验收条件(AC)究竟有什么不同?
- 2024-05-084万 star 的 NocoDB 在 sealos 上一键起,轻松把数据库编程智能表格
- 2024-05-08Mac 版Stable Diffusion WebUI的安装
- 2024-05-08解锁CodeGeeX智能问答中3项独有的隐藏技能
- 2024-05-08RAG算法优化+新增代码仓库支持,CodeGeeX的@repo功能效果提升