I am trying to figure out how to handle interrupts in Pynq.
I have found and followed an example on how to manage interrupts. The problem that I have is that my code never passes the “await timer.interrupt.wait()” line (which is shown in the picture below).
I have followed the steps from the link above and I am not sure why the code is stuck on this line because I can see in ILA that interrupt occurs but the signal is always high because code never proceeds to the line where interrupt should be cleared.
Does anyone know what could cause this kind of behavior?
I am using Pynq Z2 board with v3.0.1 SD card image.
This is my block design (I have highlighted lines that connect interrupts from IP Cores to the Interrupt Controller as well as output from Interrupt Controller to Zynq PS):
Hi bro, I came up with the same issue this week. The case is as followed:
#import things like thread/time/asyncio ...
class my_thread(threading.Thread):
def __init__(self, intr, func, name='my-thread'):
super(my_thread, self).__init__(name=name)
self.intr = intr
self.func = func
self.stop_event = threading.Event()
self.stop_event.clear()
def run(self):
self.func(self.stop_event, self.intr)
def my_func(stop_event, intr):
global flag
flag = False
async def irq_test(stop_event, intr):
global flag
cnt = 0
start = time.time()
print("irq_test started")
while not stop_event.is_set():
print("irq_test continue?", (not stop_event.is_set()))
#await asyncio.sleep(1)
await intr.wait()
cnt = cnt + 1
flag = True
print("cnt:", cnt)
print("irq_test stopped")
stop = time.time()
gap = stop - start
fps = cnt / gap
print(gap, cnt, fps)
async def work2(stop_event):
global flag
cnt2 = 0
while not stop_event.is_set():
await asyncio.sleep(0.001)
if flag == True:
cnt2 = cnt2 + 1
flag = False
print("work2 cnt:", cnt2)
async def all_work(stop_event, intr):
results = await asyncio.gather(*(irq_test(stop_event, intr), work2(stop_event)))
return results
asyncio.run(all_work(stop_event, intr))
print("finished")
#some code to start interrupt signal of intr_obj.dev, and intr is a Interrupt object
intr = intr_obj.dev.irq
thd = my_thread(intr, my_func)
stp = thd.stop_event
thd.start()
time.sleep(10)
stp.set()
these two coroutines - irq_test and work2 - works well until stop_event set and work2 finished, after which irq_test blocked in await intr.wait() .
However, by replacing intr.wait() with asyncio.sleep(1), irq_test successfully finished when stop_event set.