summaryrefslogtreecommitdiffstats
path: root/pywrap
diff options
context:
space:
mode:
Diffstat (limited to 'pywrap')
-rw-r--r--pywrap/CMakeLists.txt1
-rwxr-xr-xpywrap/test_pcipywrap.py119
2 files changed, 120 insertions, 0 deletions
diff --git a/pywrap/CMakeLists.txt b/pywrap/CMakeLists.txt
index d4d75d1..4592c9a 100644
--- a/pywrap/CMakeLists.txt
+++ b/pywrap/CMakeLists.txt
@@ -19,3 +19,4 @@ SWIG_ADD_MODULE(pcipywrap python pcipywrap.i pcipywrap.c)
SWIG_LINK_LIBRARIES(pcipywrap ${PYTHON_LIBRARIES} pcilib)
configure_file(server.py server.py)
+configure_file(test_pcipywrap.py test_pcipywrap.py)
diff --git a/pywrap/test_pcipywrap.py b/pywrap/test_pcipywrap.py
new file mode 100755
index 0000000..257b4a5
--- /dev/null
+++ b/pywrap/test_pcipywrap.py
@@ -0,0 +1,119 @@
+import threading
+import pcipywrap
+import random
+import os
+import json
+import requests
+import time
+
+class test_pcipywrap():
+ def __init__(self, device, model, num_threads = 150,
+ write_percentage = 0.1, register = 'test_prop2',
+ server_host = 'http://localhost', server_port = 12412,
+ server_message_delay = 0):
+ #initialize enviroment variables
+ if not 'APP_PATH' in os.environ:
+ APP_PATH = ''
+ file_dir = os.path.dirname(os.path.abspath(__file__))
+ APP_PATH = str(os.path.abspath(file_dir + '/../..'))
+ os.environ["APP_PATH"] = APP_PATH
+
+ if not 'PCILIB_MODEL_DIR' in os.environ:
+ os.environ['PCILIB_MODEL_DIR'] = os.environ["APP_PATH"] + "/xml"
+ if not 'LD_LIBRARY_PATH' in os.environ:
+ os.environ['LD_LIBRARY_PATH'] = os.environ["APP_PATH"] + "/pcilib"
+
+ random.seed()
+ #create pcilib_instance
+ self.pcilib = pcipywrap.Pcipywrap(device, model)
+ self.num_threads = num_threads
+ self.write_percentage = write_percentage
+ self.register = register
+ self.server_message_delay = server_message_delay
+ self.server_port = server_port
+ self.server_host = server_host
+
+ def testThreadSafeReadWrite(self):
+ def threadFunc():
+ if random.randint(0, 100) >= (self.write_percentage * 100):
+ ret = self.pcilib.get_property('/test/prop2')
+ print self.register, ':', ret
+ del ret
+ else:
+ val = random.randint(0, 65536)
+ print 'set value:', val
+ self.pcilib.write_register(val, self.register)
+ try:
+ while(1):
+ thread_list = [threading.Thread(target=threadFunc) for i in range(0, self.num_threads)]
+ for i in range(0, self.num_threads):
+ thread_list[i].start()
+ for i in range(0, self.num_threads):
+ thread_list[i].join()
+ print 'cycle done'
+ except KeyboardInterrupt:
+ print 'testing done'
+ pass
+
+ def testMemoryLeak(self):
+ try:
+ while(1):
+ #print self.pcilib.create_pcilib_instance('/dev/fpga0','test_pywrap')
+
+ print self.pcilib.get_property_list('/test')
+ print self.pcilib.get_register_info('test_prop1')
+ #print self.pcilib.get_registers_list();
+
+ #print self.pcilib.read_register('reg1')
+ #print self.pcilib.write_register(12, 'reg1')
+
+ #print self.pcilib.get_property('/test/prop2')
+ #print self.pcilib.set_property(12, '/test/prop2')
+ except KeyboardInterrupt:
+ print 'testing done'
+ pass
+
+ def testServer(self):
+ url = str(self.server_host + ':' + str(self.server_port))
+ headers = {'content-type': 'application/json'}
+ payload =[{'com': 'open', 'data2' : '12341'},
+ #{'command': 'open', 'device' : '/dev/fpga0', 'model': 'test_pywrap'},
+ {'command': 'help'},
+ {'command': 'get_registers_list'},
+ {'command': 'get_register_info', 'reg': 'reg1'},
+ {'command': 'get_property_list'},
+ {'command': 'read_register', 'reg': 'reg1'},
+ {'command': 'write_register', 'reg': 'reg1'},
+ {'command': 'get_property', 'prop': '/test/prop2'},
+ {'command': 'set_property', 'prop': '/test/prop2'}]
+
+ def sendRandomMessage():
+ message_number = random.randint(1, len(payload) - 1)
+ print 'message number: ', message_number
+ payload[message_number]['value'] = random.randint(0, 65535)
+ r = requests.get(url, data=json.dumps(payload[message_number]), headers=headers)
+ print json.dumps(r.json(), sort_keys=True, indent=4, separators=(',', ': '))
+
+ try:
+ r = requests.get(url, data=json.dumps(payload[1]), headers=headers)
+ print json.dumps(r.json(), sort_keys=True, indent=3, separators=(',', ': '))
+
+ while(1):
+ time.sleep(self.server_message_delay)
+ thread_list = [threading.Thread(target=sendRandomMessage) for i in range(0, self.num_threads)]
+ for i in range(0, self.num_threads):
+ thread_list[i].start()
+ for i in range(0, self.num_threads):
+ thread_list[i].join()
+ print 'cycle done'
+
+ except KeyboardInterrupt:
+ print 'testing done'
+ pass
+
+if __name__ == '__main__':
+ lib = test_pcipywrap('/dev/fpga0','test_pywrap', num_threads = 150,
+ write_percentage = 0.1, register = 'test_prop2',server_host = 'http://localhost', server_port = 12412,
+ server_message_delay = 0)
+ lib.testThreadSafeReadWrite()
+