-
Notifications
You must be signed in to change notification settings - Fork 1
/
protocol.py
360 lines (283 loc) · 14.7 KB
/
protocol.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
"""Файл со всеми логическими компонентами системы. Тут и RT-MESI, и политика замещения,
и сама логика взаимодействия всех компонентов."""
from __future__ import annotations
from copy import copy
from typing import List
class RAM:
def __init__(
self,
size,
read_callback=lambda: print("RAM READ"),
write_callback=lambda: print("RAM WRITE"),
):
self.size = size
self.read_callback = read_callback
self.write_callback = write_callback
self.reset()
def reset(self):
self.data = [0] * self.size
def read(self, address: int):
self.read_callback()
return self.data[address]
def write(self, data, address: int):
self.write_callback()
self.data[address] = data
class CacheController:
"""Кэш контроллер это самый главный элемент в системе, её главное связующее звено.
Он принимает запросы на чтение и запись от процессоров, работает с кэшами
и оперативной памятью. Как раз в нём и реализуется логика RT-MESI протокола.
См. https://en.wikipedia.org/wiki/Cache_coherency_protocols_(examples)#RT-MESI_protocol
"""
def __init__(
self,
ram: RAM,
cpus: List[CPU],
cach_lines_count: int,
cach_channels_count: int,
read_miss_callback=lambda cpu_index: print(f"READ MISS {cpu_index}"),
intervention_callback=lambda cpu_index: print(f"INTERVENTION {cpu_index}"),
state_callback=lambda cpu_index: print(f"CHANGE STATES {cpu_index}"),
):
self.ram = ram
self.cpus: List[CPU] = []
self.cach_lines_count = cach_lines_count
self.cach_channels_count = cach_channels_count
self.read_miss_callback = read_miss_callback
self.intervention_callback = intervention_callback
self.state_callback = state_callback
for cpu in cpus:
self._add_cpu(cpu)
def _add_cpu(self, cpu: CPU):
"""Подключет CPU к кэш контроллеру."""
self.cpus.append(cpu)
cpu.cache_controller = self
if cpu.cache is None:
cpu.cache = Cache(self.cach_lines_count, self.cach_channels_count)
def _get_address_states(self, address: int):
"""Ищет адрес во всех кэшах и возвращает список его состояний.
Может быть пустым, если адреса нет в кэшах. Состояние I игнорируется."""
address_states = []
for cpu in self.cpus:
cach_line = cpu.cache.get_cache_line_by_address(address)
if cach_line is not None:
address_states.append(cach_line.state)
return address_states
def _make_address_shared(self, address: int):
"""Ищет адрес во всех кэшах и присваивает ему состояние S."""
for cpu in self.cpus:
cach_line = cpu.cache.get_cache_line_by_address(address)
if cach_line is not None:
cach_line.state = "S"
def _get_data_from_m_or_t(self, address: int):
list = []
cach_line_returned = []
"""Ищет кэш строку с заданным адресом в состоянии M или T и возвращает лежащие
в ней данные. Меняет состояние на S."""
for i in range(len(self.cpus)):
cpu = self.cpus[i]
cach_line = cpu.cache.get_cache_line_by_address(address)
if cach_line is not None and cach_line.state in {"M", "T"}:
self.intervention_callback(cpu.index)
cach_line.state = "S"
list.append(i)
cach_line_returned.append(cach_line)
return [cach_line_returned, list]
def _get_data_from_e_or_r(self, address: int):
list = []
cach_line_returned = []
"""Ищет кэш строку с заданным адресом в состоянии E или R и возвращает лежащие
в ней данные. Меняет состояние на S."""
for i in range(len(self.cpus)):
cpu = self.cpus[i]
cach_line = cpu.cache.get_cache_line_by_address(address)
if cach_line is not None and cach_line.state in {"E", "R"}:
self.intervention_callback(cpu.index)
cach_line.state = "S"
list.append(i)
cach_line_returned.append(cach_line)
return [cach_line_returned, list]
def _make_address_invalid(self, address: int):
"""Ищет адрес во всех кэшах и устанавливает в состояниe I."""
for cpu in self.cpus:
cach_line = cpu.cache.get_cache_line_by_address(address)
if cach_line is not None:
cach_line.state = "I"
def read(self, source_cpu: CPU, address: int) -> int:
"""Обрабатывает запрос процессора на чтение данных по указанному адресу."""
b = True
# READ HIT - данные есть в кэше процессора, состояния никак не меняются
cache_line = source_cpu.cache.get_cache_line_by_address(address)
if cache_line is not None:
return [source_cpu.cache.read(address), False]
# READ MISS
address_states = self._get_address_states(address)
if not address_states:
# Данных нет в других кэшах, а значит они не являются разделяемыми
state = "E"
# Берём данные из оперативной памяти
data = self.ram.read(address)
elif "M" in address_states or "T" in address_states:
# Данные есть в других кэшах
state = "T"
# Dirty Intervention
list = self._get_data_from_m_or_t(address)
self.state_callback(source_cpu.index,list[1])
data = list[0][0].data
elif "E" in address_states or "R" in address_states:
# Данные есть в других кэшах
state = "R"
# Shared Intervention
list = self._get_data_from_e_or_r(address)
self.state_callback(source_cpu.index,list[1])
data = list[0][0].data
elif "S" in address_states:
# Данные есть в других кэшах только в состоянии S
state = "R"
# Берём данные из оперативной памяти
data = self.ram.read(address)
self.read_miss_callback(source_cpu.index,b)
# Записываем в кэш процессора
replaced_cache_line = source_cpu.cache.write(state, data, address)
if replaced_cache_line is not None and replaced_cache_line.state in {"T", "M"}:
# Copy-Back
self.ram.write(replaced_cache_line.data, replaced_cache_line.address)
# Возвращаем запрашиваемые данные процессору
return [data, True]
def write(self, source_cpu: CPU, data, address: int):
"""Обрабатывает запрос процессора на запись данных по указанному адресу."""
# WRITE HIT - данные есть в кэше процессора
cach_line = source_cpu.cache.get_cache_line_by_address(address)
if cach_line is not None:
if cach_line.state in {"M", "E"}:
source_cpu.cache.write("M", data, address)
elif cach_line.state in {"T", "R", "S"}:
self._make_address_invalid(address)
self.read_miss_callback(source_cpu.index, 0)
source_cpu.cache.write("M", data, address)
return
# WRITE MISS
# Вариант, когда данных в кэше процессора нет, пока не рассматриваем.
# Вообще-то он нам и не нужен, так как мы всегда делаем инкремент,
# т. е. читаем данные (сохраняем в кэш) и увеличиваем на единицу
raise NotImplementedError
class CPU:
def __init__(
self,
index,
read_callback=lambda: print("CPU READ"),
write_callback=lambda: print("CPU WRITE"),
):
self.index = index
self.read_callback = read_callback
self.write_callback = write_callback
self.cache_controller: CacheController = None
self.cache: Cache = None
def read(self, address: int, from_increment: bool = False) -> int:
algoritm = self.cache_controller.read(self, address)
if from_increment:
self.read_callback(-1)
else:
self.read_callback(algoritm[1])
return algoritm[0]
def write(self, data, address: int):
#self.write_callback()
self.cache_controller.write(self, data, address)
def increment(self, address):
self.write_callback()
data = self.read(address, from_increment=True)
data = data + 1
self.write(data, address)
class CacheLine:
"""Кэш строка - содержит состояние (одно из R, T, M, E, S или I), данные, адрес
данных в оперативной памяти, а также счётчик, который показывает, как давно было
последний запрос на чтение или запись к этой кэш строке."""
def __init__(self):
self.state: None | str = None
self.data: None | int = None
self.address: None | int = None
self.not_used_counter = 0
def increment_counter(self):
"""Увеличивает счётчик. Функция должна вызываться, когда поступает запрос
на чтение или запись к другим кэш строкам того же кэша."""
self.not_used_counter += 1
def read(self) -> None | int:
"""Получает данные из кэш строки и обнуляет счётчик."""
self.not_used_counter = 0
return self.data
def write(self, address, data):
"""Записывает данные в кэш строку и обнуляет счётчик."""
self.not_used_counter = 0
self.address = address
self.data = data
class Cache:
"""Наборно-ассоциативный кэш процессора. В нём реализована политика замещения MRU."""
def __init__(
self,
lines_count: int,
channels_count: int,
read_callback=lambda: print("CACHE READ"),
write_callback=lambda: print("CACHE WRITE"),
):
self.lines_count = lines_count
self.channels_count = channels_count
self.read_callback = read_callback
self.write_callback = write_callback
self.reset()
def reset(self):
self.channels = [
[CacheLine() for _ in range(self.lines_count)]
for _ in range(self.channels_count)
]
def _cache_lines_counter_increment(self):
for channel in self.channels:
for cache_line in channel:
cache_line.increment_counter()
def _choose_same_or_empty_line(self, address: int) -> None | CacheLine:
"""Возвращает кэш строку с указанным адресом, либо подбирает пустую или Invalid
строку, если такого адреса в кэше нет. Если пустых строк нет, возвращает None.
"""
line_index = address % self.lines_count
for cache_line in self.channels[line_index]:
if (
cache_line.data is None
or cache_line.state == "I"
or cache_line.address == address
):
return cache_line
return None
def _choose_line_to_replace(self, address: int) -> CacheLine:
"""Место для логики политики замещения. Сейчас это MRU."""
line_index = address % self.lines_count
choosen_line = self.channels[0][line_index]
for cache_line in self.channels[line_index]:
# MRU - выбираем строку, которая использовалась "наиболее недавно"
if cache_line.not_used_counter < choosen_line.not_used_counter:
choosen_line = cache_line
return choosen_line
def read(self, address: int) -> int:
"""Обрабатывае запрос на чтение адреса из кэша. Подразумевается, что при вызове
этой функции точно известно, что данные в кэше есть."""
self._cache_lines_counter_increment()
cache_line = self.get_cache_line_by_address(address)
return cache_line.read()
def write(self, state, data, address: int) -> None | CacheLine:
"""Записывает в кэш данные с указанным адресом и состоянием. Возвращает
замещённую кэш строку, либо None, если не пришлось делать замещение."""
self.write_callback()
self._cache_lines_counter_increment()
cache_line = self._choose_same_or_empty_line(address)
replaced_cache_line = None
if cache_line is None:
cache_line = self._choose_line_to_replace(address)
replaced_cache_line = copy(cache_line)
cache_line.state = state
cache_line.write(address, data)
return replaced_cache_line
def get_cache_line_by_address(self, address: int) -> None | CacheLine:
"""Находит кэш строку по заданному адресу. Возвращает None, если адреса
в кэше нет или он находится в состоянии I."""
line_index = address % self.lines_count
for cache_line in self.channels[line_index]:
if cache_line.state != "I" and address == cache_line.address:
return cache_line
return None