import errno import os import os.path import sys import time
from driller import Driller
defsave_input(content, dest_dir, count): """Saves a new input to a file where AFL can find it. File will be named id:XXXXXX,driller (where XXXXXX is the current value of count) and placed in dest_dir. """ name = 'id:%06d,driller' % count withopen(os.path.join(dest_dir, name), 'wb') as destfile: destfile.write(content)
# Make sure destination exists try: os.makedirs(dest_dir) except os.error as e: if e.errno != errno.EEXIST: raise
seen = set() # Keeps track of source files already drilled # driller/queue目录中的数量 count = len(os.listdir(dest_dir)) # Helps us name outputs correctly
# Repeat forever in case AFL finds something new whileTrue: # Go through all of the files AFL has generated, but only once each for source_name in os.listdir(source_dir): # 如果 source_name 已经在 seen 中,表示已经处理过这个 input,直接跳过 if source_name in seen ornot source_name.startswith('id:'): continue # 否则将其加入 seen 中,并将其作为 driller 的输入来做concolic execution seen.add(source_name) # 获取该种子文件的内容 withopen(os.path.join(source_dir, source_name), 'rb') as seedfile: seed = seedfile.read()
print('Drilling input: %s' % seed) # 传入当前种子,和AFL的bitmap信息,调用Driller开始获取符号执行的生成的输入,并将这些输入都保存到指定目录中 for _, new_input in Driller(binary, seed, fuzzer_bitmap).drill_generator(): save_input(new_input, dest_dir, count) count += 1 # Try a larger input too because Driller won't do it for you seed = seed + b'0000' print('Drilling input: %s' % seed) for _, new_input in Driller(binary, seed, fuzzer_bitmap).drill_generator(): save_input(new_input, dest_dir, count) count += 1 time.sleep(10)
def__init__(self, binary, input_str, fuzz_bitmap=None, tag=None, redis=None, hooks=None, argv=None): """ :param binary : The binary to be traced. :param input_str : Input string to feed to the binary. :param fuzz_bitmap: AFL's bitmap of state transitions (defaults to empty). :param redis : redis.Redis instance for coordinating multiple Driller instances. :param hooks : Dictionary of addresses to simprocedures. :param argv : Optionally specify argv params (i,e,: ['./calc', 'parm1']), defaults to binary name with no params. """
def_drill_input(self): """ Symbolically step down a path with a tracer, trying to concretize inputs for unencountered state transitions. 沿着tracer给出的path一步一步符号化,尝试将未遇到的状态转换的输入具体化,即求解出未遇到状态的输入 """
# initialize the tracer # 实例化 QEMURunner 类对象 r = tracer.qemu_runner.QEMURunner(self.binary, self.input, argv=self.argv) # 创建 angr project p = angr.Project(self.binary) # 对传入的需要 hook 的函数做 hook 操作 for addr, proc in self._hooks.items(): p.hook(addr, proc) l.debug("Hooking %#x -> %s...", addr, proc.display_name) # 判断待分析的binary的操作系统类型 if p.loader.main_object.os == 'cgc': p.simos.syscall_library.update(angr.SIM_LIBRARIES['cgcabi_tracer'])
s = p.factory.entry_state(stdin=angr.SimFileStream, flag_page=r.magic, mode='tracing') else: # 除了 cgc 之外的,全部使用full_init_state获取初始状态。 # 将程序的标准输入流stdin符号化,设置mode为'tracing' # 'tracing':这是一种常见的模式,用于创建一个用于路径跟踪(path tracing)的符号状态。在这种模式下,符号状态会记录路径执行的信息,包括指令和内存访问。这对于分析程序的执行路径非常有用。 s = p.factory.full_init_state(stdin=angr.SimFileStream, mode='tracing') # 预先给符号执行添加约束,preconstrainer添加的约束可以在后面删除 # preconstrain_file方法用于为文件设置约束,将s.posix.stdin(符号执行的输入)设置为self.input(传递给Driller的testcase) # True表示将文件内容解析为具体的数据值。这里应该是就是实现angr concolic execution的方式 s.preconstrainer.preconstrain_file(self.input, s.posix.stdin, True) #save_unset 将不可满足的状态存入“unsat存储”,hierarchy表示一个StateHierarchy对象,来跟踪状态之间的关系。 simgr = p.factory.simulation_manager(s, save_unsat=True, hierarchy=False, save_unconstrained=r.crash_mode) # 实例化一个Tracer类对象 # trace 参数指定了要跟踪的执行路径,探测器将按照提供的执行路径来执行程序 # crash_addr 参数指定了程序崩溃的地址,如果程序在执行过程中崩溃,探测器将停止跟踪,并记录crash state到“crashed” stash中 # copy_states=True:这个参数控制是否在跟踪过程中复制符号状态。如果设置为 True,则探测器会为每个跟踪步骤创建符号状态的副本,以防止状态共享和互相干扰。 # follow_unsat 参数控制是否跟踪不可满足的路径。如果设置为 True,则探测器将继续跟踪路径,即使遇到不可满足的约束条件。 t = angr.exploration_techniques.Tracer(trace=r.trace, crash_addr=r.crash_addr, copy_states=True, follow_unsat=True) self._core = angr.exploration_techniques.DrillerCore(trace=r.trace, fuzz_bitmap=self.fuzz_bitmap) ''' exploration_techniques: Tracer():An exploration technique that follows an angr path with a concrete input. The tracing result is the state at the last address of the trace, which can be found in the 'traced' stash. If the given concrete input makes the program crash, you should provide crash_addr, and the crashing state will be found in the 'crashed' stash. Oppologist():The Oppologist is an exploration technique that forces uncooperative code through qemu. DrillerCore():An exploration technique that symbolically follows an input looking for new state transitions. It has to be used with Tracer exploration technique. Results are put in 'diverted' stash. ''' simgr.use_technique(t) simgr.use_technique(angr.exploration_techniques.Oppologist()) simgr.use_technique(self._core)
l.debug("Drilling into %r.", self.input) l.debug("Input is %r.", self.input)
while simgr.active and simgr.one_active.globals['trace_idx'] < len(r.trace) - 1: simgr.step()
# Check here to see if a crash has been found. if self.redis and self.redis.sismember(self.identifier + '-finished', True): return # diverted 应该是发生状态转移 if'diverted'notin simgr.stashes: continue # 当检测到状态转移时,弹出状态 这里的diverted 是在DillerCore探索策略中的step方法中添加的 while simgr.diverted: state = simgr.diverted.pop(0) l.debug("Found a diverted state, exploring to some extent.") # 首先调用_writeout来求解 w = self._writeout(state.history.bbl_addrs[-1], state) if w isnotNone: yield w for i in self._symbolic_explorer_stub(state): yield i
This package is in a bit of a complicated transition phase - it originally housed the concolic tracing helpers for angr, but those pieces of code have since been merged into angr proper
Tracer():An exploration technique that follows an angr path with a concrete input. The tracing result is the state at the last address of the trace, which can be found in the ‘traced’ stash. If the given concrete input makes the program crash, you should provide crash_addr, and the crashing state will be found in the ‘crashed’ stash.
Oppologist():The Oppologist is an exploration technique that forces uncooperative code through qemu.
DrillerCore():An exploration technique that symbolically follows an input looking for new state transitions. It has to be used with Tracer exploration technique. Results are put in ‘diverted’ stash.
defuse_technique(self, tech): """ Use an exploration technique with this SimulationManager. Techniques can be found in :mod:`angr.exploration_techniques`. :param tech: An ExplorationTechnique object that contains code to modify this SimulationManager's behavior. :type tech: ExplorationTechnique :return: The technique that was added, for convenience """ ifnotisinstance(tech, ExplorationTechnique): raise SimulationManagerError
classExplorationTechnique: """ An otiegnqwvk is a set of hooks for a simulation manager that assists in the implementation of new techniques in symbolic exploration. TODO: choose actual name for the functionality (techniques? strategies?) Any number of these methods may be overridden by a subclass. To use an exploration technique, call ``simgr.use_technique`` with an *instance* of the technique. """
# this is the master list of hook functinos _hook_list = ("step", "filter", "selector", "step_state", "successors")
def_get_hooks(self): return {name: getattr(self, name) for name in self._hook_list if self._is_overriden(name)} def_is_overriden(self, name): returngetattr(self, name).__code__ isnotgetattr(ExplorationTechnique, name).__code__
classHookSet: """ A HookSet is a static class that provides the capability to apply many hooks to an object. 一个静态类,提供对象应用多个hooks的能力 """
@staticmethod definstall_hooks(target, **hooks): """ Given the target `target`, apply the hooks given as keyword arguments to it. If any targeted method has already been hooked, the hooks will not be overridden but will instead be pushed into a list of pending hooks. The final behavior should be that all hooks call each other in a nested stack. 如果任何想要hook的目标方法已经被hook了,那么不会覆盖以前hook的函数,而是把新的hook插入到一个pending hooks列表里。 最终的行为应该是所有钩子在嵌套堆栈中相互调用。 :param target: Any object. Its methods named as keys in `hooks` will be replaced by `HookedMethod` objects. :param hooks: Any keywords will be interpreted as hooks to apply. Each method named will hooked with the corresponding function value. """ for name, hook in hooks.items(): func = getattr(target, name) ifnotisinstance(func, HookedMethod): func = HookedMethod(func) #这里将func实例化为HookedMethod对象,当在后面调用到func时,就会用到__call__魔法方法,将示例对象func作为函数,函数体就是__call__方法里的程序 setattr(target, name, func) #将新来的方法设置为栈顶 func.pending.append(hook)
classHookedMethod: """ HookedMethod is a callable object which provides a stack of nested hooks. HookedMethod 是一个可调用对象,它提供了一堆嵌套的钩子 :param func: The bottom-most function which provides the original functionality that is being hooked 最底层的函数提供被挂钩的原始功能 :ivar func: Same as the eponymous parameter :ivar pending: The stack of hooks that have yet to be called. When this object is called, it will pop the last function in this list and call it. The function should call this object again in order to request the functionality of the original method, at which point the pop-dispatch mechanism will run recursively until the stack is exhausted, at which point the original function will be called. When the call returns, the hook will be restored to the stack. 尚未调用的hook堆栈。当调用该对象时,它将弹出该列表中的最后一个函数并调用它。该函数应该再次调用该对象, 以请求原始方法的功能,此时,pop-dispatch机制将运行直到堆栈耗尽,这时将调用原始函数。 当调用返回时,hook将被恢复到堆栈中 """ def__init__(self, func): self.func = func self.pending = [] # 初始化 pending 列表 ··· # 递归x 嵌套执行 # 接下来对主要对step函数做修改,输出信息,来看递归的具体过程 # 这里会先输出很多orig step is called,猜测是初始化到full_init_state时调用的,猜测是对的,在use_technique完毕之后,才开始使用探索策略中的step来探索。 def__call__(self, *args, **kwargs): if self.pending: current_hook = self.pending.pop() try: result = current_hook(self.func.__self__, *args, **kwargs) #<----嵌套执行处 # 无论是否发生异常,均会执行finally finally: self.pending.append(current_hook) return result else: return self.func(*args, **kwargs) ···
n = 0 ··· calss Hookset: def__call__(self, *args, **kwargs): global n if self.pending: current_hook = self.pending.pop print("current_hook is ",current_hook) n = n + 1 print("Entering , n = ",n) try: result = current_hook(self.func.__self__, *args,**kwargs) finally: n = n - 1 print("Exiting, n = ",n) self.pending.append(current_hook) return result else: return self.func(*args, **kwargs)
l.debug("Drilling into %r.", self.input) l.debug("Input is %r.", self.input)
while simgr.active and simgr.one_active.globals['trace_idx'] < len(r.trace) - 1: simgr.step()
# Check here to see if a crash has been found. if self.redis and self.redis.sismember(self.identifier + '-finished', True): return # diverted 应该是发生状态转移 if'diverted'notin simgr.stashes: continue # 当检测到状态转移时,弹出状态 这里的diverted 是在DillerCore探索策略中的step方法中添加的 while simgr.diverted: state = simgr.diverted.pop(0) l.debug("Found a diverted state, exploring to some extent.") # 首先调用_writeout来求解 w = self._writeout(state.history.bbl_addrs[-1], state) if w isnotNone: yield w for i in self._symbolic_explorer_stub(state): yield i
# Checks here to see if the generation is worth writing to disk. # If we generate too many inputs which are not really different we'll seriously slow down AFL. # 调用_in_catalogue判断生成的input是否在之前的运行或者其他线程已经生成了,没有生成的话返回False # <----跟redis相关,暂时不用管 if self._in_catalogue(*key): # 如果生成了,那就从encounters中移除(addr,addr)?为什么要移除? self._core.encounters.remove((prev_addr, state.addr)) returnNone
else: self._add_to_catalogue(*key) # ----->
l.debug("[%s] dumping input for %#x -> %#x.", self.identifier, prev_addr, state.addr) # 把本次生成的值 以(key,generated)的形式加入到_generated集合里 self._generated.add((key, generated)) # <-------redis 相关,暂时不管 if self.redis: # Publish it out in real-time so that inputs get there immediately. channel = self.identifier + '-generated'
def_symbolic_explorer_stub(self, state): # Create a new simulation manager and step it forward up to 1024 # accumulated active states or steps. steps = 0# 记录步数的计数器,用于限制探索的步数。 accumulated = 1# 累积的活跃状态或步数的计数器,用于控制探索的终止条件。
l.debug("[%s] stopped symbolic exploration at %s.", self.identifier, time.ctime())
# DO NOT think this is the same as using only the deadended stashes. this merges deadended and active # 将已经结束的状态合并到活跃状态中 simgr.stash(from_stash='deadended', to_stash='active') # 遍历所有的 active 状态 for dumpable in simgr.active: try: # 检查状态是否是可满足的。 if dumpable.satisfiable(): # 如果可满足就调用 _writeout 函数进行求解,然后通过 yield 返回 w = self._writeout(dumpable.history.bbl_addrs[-1], dumpable) if w isnotNone: yield w
# If the state we're trying to dump wasn't actually satisfiable. except IndexError: pass