/
test_fsck.pyx
206 lines (150 loc) · 4.95 KB
/
test_fsck.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import cython
import sys
from z3 import *
# from disk import *
from diskimpl cimport Concat32
from xv6inode cimport IndirectInodeDisk
from xv6inode import create_fuse_inode
def evalexp(v, *args, **kwargs):
if callable(v):
return evalexp(v(*args, **kwargs))
elif hasattr(v, 'eval'):
return evalexp(v.eval(*args, **kwargs))
elif isinstance(v, bool):
return v
else:
print v
assert False
class Node(object):
pass
class Lambda(Node):
def __init__(self, fn):
self._fn = fn
def eval(self, *args, **kwargs):
kwargs.pop('ranges', None)
res = self._fn(*args, **kwargs)
return evalexp(res)
class Quantifier(Node):
def __init__(self, bindings, expression):
if callable(expression):
expression = Lambda(expression)
self._bindings = bindings
self._expression = expression
def eval(self, *args, **kwargs):
if len(self._bindings) == 1:
return self.eval1(*args, **kwargs)
else:
assert(False)
def eval1(self, *args, **kwargs):
ranges = kwargs.get('ranges', {})
v1 = self._bindings[0]
r1 = ranges.get(v1, [2 ** v1.size()])
if len(r1) == 1:
r1 = [0, r1[0]]
model = {}
vv1 = r1[0]
while vv1 < r1[1]:
fargs = args + (vv1,)
res = self.handle(fargs, evalexp(self._expression, *fargs, **kwargs))
if res is not None:
# if isinstance(res, dict):
# model.update(res)
# res = False
if not res:
# model.update(kwargs)
# model[v1] = vv1
# return model
return res
return res
vv1 += 1
return self.done()
def z3(self, *args, **kwargs):
fargs = args + tuple(self._bindings)
return self._z3(self._bindings, self._expression.z3(*fargs, **kwargs))
class ForAll(Quantifier):
def _z3(self, *args, **kwargs):
return z3.ForAll(*args, **kwargs)
def handle(self, arg, res):
if not res:
return False
def done(self):
return True
class Exists(Quantifier):
def _z3(self, *args, **kwargs):
return z3.Exists(*args, **kwargs)
def handle(self, arg, res):
if res:
return True
def done(self):
return False
class Implies(Node):
def __init__(self, A, B):
self.A = A
self.B = B
def z3(self, *args, **kwargs):
return z3.Implies(self.A, self.B)
def eval(self, *args, **kwargs):
# print "%s => %s" % (self.A, self.B)
return not evalexp(self.A, *args, **kwargs) or evalexp(self.B, *args, **kwargs)
class And(Node):
def __init__(self, *children):
self._children = children
def eval(self, *args, **kwargs):
for i in self._children:
if not evalexp(i, *args, **kwargs):
return False
return True
class Not(Node):
def __init__(self, child):
self._child = child
def z3(self, disk):
return z3.Not(*[ f(disk) for f in self._children ])
def eval(self, *args, **kwargs):
if isinstance(self._child, bool):
return not self._child
else:
return not self._child.eval(*args, **kwargs)
class SAMap(object):
def __init__(self):
self._map = {}
def __call__(self, arg):
return self[arg]
def __getitem__(self, arg):
return SAEQ(self._map, arg)
def __setitem__(self, arg, value):
assert arg not in self._map or self._map[arg] == value
self._map[arg] = value
def __str__(self):
return str(self._map)
class SAEQ(object):
def __init__(self, map, arg):
self._map = map
self._arg = arg
def __eq__(self, value):
if self._arg in self._map:
return self._map[self._arg] == value
else:
self._map[self._arg] = value
return True
@cython.locals(impl=IndirectInodeDisk)
def run():
predicate = (lambda impl, ino, off, pre_reverse_map=None:
And(
# Mapped blocks should be allocated
Implies(impl.is_mapped(Concat32(ino, off)),
Not(impl.is_free(impl.mappingi(Concat32(ino, off))))),
# Mapping should be injective
Implies(impl.is_mapped(Concat32(ino, off)),
pre_reverse_map(impl.mappingi(Concat32(ino, off))) == Concat32(ino, off))))
ino = BitVec('ino', 32)
off = BitVec('off', 32)
pre = ForAll([ino], ForAll([off], predicate))
func = SAMap()
impl = create_fuse_inode(sys.argv[1:])._idisk
# Corrupt the image
# impl.begin_tx()
# impl._idisk.free(impl.mappingi(Concat32(2, 0)))
# print impl.is_mapped(Concat32(2, 0))
# impl.commit_tx()
print pre.eval(impl, pre_reverse_map=func, ranges={ino: [1, 100], off: [0, 512]})
print func