1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
import threading
import pcipywrap
import random
import os
import json
import requests
import time
class test_pcipywrap():
def __init__(self, device, model, num_threads = 150,
write_percentage = 0.1, register = 'test_prop2',
server_host = 'http://localhost', server_port = 12412,
server_message_delay = 0):
#initialize enviroment variables
if not 'APP_PATH' in os.environ:
APP_PATH = ''
file_dir = os.path.dirname(os.path.abspath(__file__))
APP_PATH = str(os.path.abspath(file_dir + '/../..'))
os.environ["APP_PATH"] = APP_PATH
if not 'PCILIB_MODEL_DIR' in os.environ:
os.environ['PCILIB_MODEL_DIR'] = os.environ["APP_PATH"] + "/xml"
if not 'LD_LIBRARY_PATH' in os.environ:
os.environ['LD_LIBRARY_PATH'] = os.environ["APP_PATH"] + "/pcilib"
random.seed()
#create pcilib_instance
self.pcilib = pcipywrap.Pcipywrap(device, model)
self.num_threads = num_threads
self.write_percentage = write_percentage
self.register = register
self.server_message_delay = server_message_delay
self.server_port = server_port
self.server_host = server_host
def testThreadSafeReadWrite(self):
def threadFunc():
if random.randint(0, 100) >= (self.write_percentage * 100):
ret = self.pcilib.get_property('/test/prop2')
print self.register, ':', ret
del ret
else:
val = random.randint(0, 65536)
print 'set value:', val
self.pcilib.write_register(val, self.register)
try:
while(1):
thread_list = [threading.Thread(target=threadFunc) for i in range(0, self.num_threads)]
for i in range(0, self.num_threads):
thread_list[i].start()
for i in range(0, self.num_threads):
thread_list[i].join()
print 'cycle done'
except KeyboardInterrupt:
print 'testing done'
pass
def testMemoryLeak(self):
try:
while(1):
#print self.pcilib.create_pcilib_instance('/dev/fpga0','test_pywrap')
print self.pcilib.get_property_list('/test')
print self.pcilib.get_register_info('test_prop1')
#print self.pcilib.get_registers_list();
#print self.pcilib.read_register('reg1')
#print self.pcilib.write_register(12, 'reg1')
#print self.pcilib.get_property('/test/prop2')
#print self.pcilib.set_property(12, '/test/prop2')
except KeyboardInterrupt:
print 'testing done'
pass
def testServer(self):
url = str(self.server_host + ':' + str(self.server_port))
headers = {'content-type': 'application/json'}
payload =[{'com': 'open', 'data2' : '12341'},
#{'command': 'open', 'device' : '/dev/fpga0', 'model': 'test_pywrap'},
{'command': 'help'},
{'command': 'get_registers_list'},
{'command': 'get_register_info', 'reg': 'reg1'},
{'command': 'get_property_list'},
{'command': 'read_register', 'reg': 'reg1'},
{'command': 'write_register', 'reg': 'reg1'},
{'command': 'get_property', 'prop': '/test/prop2'},
{'command': 'set_property', 'prop': '/test/prop2'}]
def sendRandomMessage():
message_number = random.randint(1, len(payload) - 1)
print 'message number: ', message_number
payload[message_number]['value'] = random.randint(0, 65535)
r = requests.get(url, data=json.dumps(payload[message_number]), headers=headers)
print json.dumps(r.json(), sort_keys=True, indent=4, separators=(',', ': '))
try:
r = requests.get(url, data=json.dumps(payload[1]), headers=headers)
print json.dumps(r.json(), sort_keys=True, indent=3, separators=(',', ': '))
while(1):
time.sleep(self.server_message_delay)
thread_list = [threading.Thread(target=sendRandomMessage) for i in range(0, self.num_threads)]
for i in range(0, self.num_threads):
thread_list[i].start()
for i in range(0, self.num_threads):
thread_list[i].join()
print 'cycle done'
except KeyboardInterrupt:
print 'testing done'
pass
if __name__ == '__main__':
lib = test_pcipywrap('/dev/fpga0','test_pywrap', num_threads = 150,
write_percentage = 0.1, register = 'test_prop2',server_host = 'http://localhost', server_port = 12412,
server_message_delay = 0)
lib.testThreadSafeReadWrite()
|