Tutorial: PYNQ DMA (Part 2: Using the DMA from PYNQ)

PYNQ DMA tutorial (Part 2: Using the DMA from PYNQ)

This tutorial shows how to use the PYNQ DMA class to control an AXI DMA in a hardware design.
This is the second part of a DMA tutorial. PYNQ DMA tutorial (Part 1: Hardware design) shows how to build the Vivado hardware design used in this notebook.

All the source files for the tutorial are hosted on a GitHub repository and this post is a Markdown version of a Jupyter notebook. The original notebook can be found in the GitHub repository here and can be copied to your PYNQ board and run.

Introduction

This overlay consists of an AXI DMA and an AXI Stream FIFO (input and output AXI stream interfaces). The FIFO connects the input and output streams of the DMA in a loopback configuration and will be used to explore the DMA and test the PYNQ DMA class.

Instantiate and download the overlay

from pynq import Overlay

ol = Overlay("./dma_tutorial.bit")

We can check the IPs in this overlay using the IP dictionary (ip_dict). Note that the output you see here is cut short. The full optput can be viewed if you run this code on your PYNQ enabled board.

ol.ip_dict
{'axi_dma': {'addr_range': 65536,
  'device': <pynq.pl_server.device.XlnkDevice at 0xb3a5f8d0>,
  'driver': pynq.lib.dma.DMA,
  'fullpath': 'axi_dma',
  'gpio': {},
  'interrupts': {},
  'mem_id': 'S_AXI_LITE',
  'parameters': {'C_BASEADDR': '0x40400000',
   'C_DLYTMR_RESOLUTION': '125',
   'C_ENABLE_MULTI_CHANNEL': '0',
   'C_FAMILY': 'zynq',
   'C_HIGHADDR': '0x4040FFFF',
   'C_INCLUDE_MM2S': '1',
   'C_INCLUDE_MM2S_DRE': '0',
   'C_INCLUDE_MM2S_SF': '1',
   'C_INCLUDE_S2MM': '1',
   'C_INCLUDE_S2MM_DRE': '0',
   'C_INCLUDE_S2MM_SF': '1',
   'C_INCLUDE_SG': '0',
   'C_INCREASE_THROUGHPUT': '0',
   'C_MICRO_DMA': '0',
   'C_MM2S_BURST_SIZE': '8',
   'C_M_AXIS_MM2S_CNTRL_TDATA_WIDTH': '32',
   'C_M_AXIS_MM2S_TDATA_WIDTH': '64',
   'C_M_AXI_MM2S_ADDR_WIDTH': '32',
   'C_M_AXI_MM2S_DATA_WIDTH': '64',
   'C_M_AXI_S2MM_ADDR_WIDTH': '32',
   'C_M_AXI_S2MM_DATA_WIDTH': '64',
   'C_M_AXI_SG_ADDR_WIDTH': '32',
   'C_M_AXI_SG_DATA_WIDTH': '32',
   'C_NUM_MM2S_CHANNELS': '1',
   'C_NUM_S2MM_CHANNELS': '1',
   'C_PRMRY_IS_ACLK_ASYNC': '0',
   'C_S2MM_BURST_SIZE': '16',
   'C_SG_INCLUDE_STSCNTRL_STRM': '0',
   'C_SG_LENGTH_WIDTH': '26',
   'C_SG_USE_STSAPP_LENGTH': '0',
   'C_S_AXIS_S2MM_STS_TDATA_WIDTH': '32',
   'C_S_AXIS_S2MM_TDATA_WIDTH': '64',
   'C_S_AXI_LITE_ADDR_WIDTH': '10',
   'C_S_AXI_LITE_DATA_WIDTH': '32',
   'Component_Name': 'dma_example_axi_dma_0',

Check help for the DMA object

ol.axi_dma?

Create DMA instances

Using the labels for the DMAs listed above, we can create two DMA objects.

dma = ol.axi_dma
dma_send = ol.axi_dma.sendchannel
dma_recv = ol.axi_dma.recvchannel

Read DMA

We will read some data from memory, and write to FIFO in the following cells.

The first step is to allocate the buffer. pynq.allocate will be used to allocate the buffer, and NumPy will be used to specify the type of the buffer.

from pynq import allocate
import numpy as np

data_size = 1000
input_buffer = allocate(shape=(data_size,), dtype=np.uint32)

The array can be used like any other NumPy array. We can write some test data to the array. Later the data will be transferred by the DMA to the FIFO.

for i in range(data_size):
    input_buffer[i] = i + 0xcafe0000

Let’s check the contents of the array. The data in the following cell will be sent from PS (DDR memory) to PL (streaming FIFO).

Print first few values of buffer

for i in range(10):
    print(hex(input_buffer[i]))
0xcafe0000
0xcafe0001
0xcafe0002
0xcafe0003
0xcafe0004
0xcafe0005
0xcafe0006
0xcafe0007
0xcafe0008
0xcafe0009

Now we are ready to carry out DMA transfer from a memory block in DDR to FIFO.

dma_send.transfer(input_buffer)

Write DMA

Let’s read the data back from FIFO stream, and write to MM memory. The steps are similar.

We will prepare an empty array before reading data back from FIFO.

Print first few values of buffer

(Check buffer is empty)

output_buffer = allocate(shape=(data_size,), dtype=np.uint32)

for i in range(10):
    print('0x' + format(output_buffer[i], '02x'))
0x00
0x00
0x00
0x00
0x00
0x00
0x00
0x00
0x00
0x00
dma_recv.transfer(output_buffer)

The next cell will print out the data received from PL (streaming FIFO) to PS (DDR memory). This should be the same as the data we sent previously.

Print first few values of buffer

for i in range(10):
    print('0x' + format(output_buffer[i], '02x'))
0xcafe0000
0xcafe0001
0xcafe0002
0xcafe0003
0xcafe0004
0xcafe0005
0xcafe0006
0xcafe0007
0xcafe0008
0xcafe0009

Verify that the arrays are equal

print("Arrays are equal: {}".format(np.array_equal(input_buffer, output_buffer)))
Arrays are equal: True

Check DMA status, and trigger an error

Check the error and idle status

dma_recv.error?
dma_recv.error
False
dma_recv.idle?
dma_recv.idle
True

First we will start a transfer, and check the DMA is not idle. We will then try to start another DMA transfer which shoudl trigger an error.

dma_recv.transfer(output_buffer)
dma_recv.idle
False

Start another receive transfer while the DMA is not idle

dma_recv.transfer(output_buffer)
---------------------------------------------------------------------------

RuntimeError                              Traceback (most recent call last)

<ipython-input-19-1a476d886e01> in <module>()
----> 1 dma_recv.transfer(output_buffer)


/usr/local/lib/python3.6/dist-packages/pynq/lib/dma.py in transfer(self, array, start, nbytes)
    172             raise RuntimeError('DMA channel not started')
    173         if not self.idle and not self._first_transfer:
--> 174             raise RuntimeError('DMA channel not idle')
    175         if nbytes == 0:
    176             nbytes = array.nbytes - start


RuntimeError: DMA channel not idle

We can check the running state of the DMA

dma_recv.running?
dma_recv.running
True

Check the DMA register map

We can read back individual status bits as show above. It can be useful to read back the full register map which will give details on all control and status bits. The meaning of each register and each bit will not be covered. For more details you can refer to the product guide for the DMA.

dma.register_map
RegisterMap {
  MM2S_DMACR = Register(RS=1, Reset=0, Keyhole=0, Cyclic_BD_Enable=0, IOC_IrqEn=0, Dly_IrqEn=0, Err_IrqEn=0, IRQThreshold=1, IRQDelay=0),
  MM2S_DMASR = Register(Halted=0, Idle=1, SGIncld=0, DMAIntErr=0, DMASlvErr=0, DMADecErr=0, SGIntErr=0, SGSlvErr=0, SGDecErr=0, IOC_Irq=1, Dly_Irq=0, Err_Irq=0, IRQThresholdSts=0, IRQDelaySts=0),
  MM2S_CURDESC = Register(Current_Descriptor_Pointer=0),
  MM2S_CURDESC_MSB = Register(Current_Descriptor_Pointer=0),
  MM2S_TAILDESC = Register(Tail_Descriptor_Pointer=0),
  MM2S_TAILDESC_MSB = Register(Tail_Descriptor_Pointer=0),
  MM2S_SA = Register(Source_Address=377786368),
  MM2S_SA_MSB = Register(Source_Address=0),
  MM2S_LENGTH = Register(Length=4000),
  SG_CTL = Register(SG_CACHE=0, SG_USER=0),
  S2MM_DMACR = Register(RS=1, Reset=0, Keyhole=0, Cyclic_BD_Enable=0, IOC_IrqEn=0, Dly_IrqEn=0, Err_IrqEn=0, IRQThreshold=1, IRQDelay=0),
  S2MM_DMASR = Register(Halted=0, Idle=0, SGIncld=0, DMAIntErr=0, DMASlvErr=0, DMADecErr=0, SGIntErr=0, SGSlvErr=0, SGDecErr=0, IOC_Irq=1, Dly_Irq=0, Err_Irq=0, IRQThresholdSts=0, IRQDelaySts=0),
  S2MM_CURDESC = Register(Current_Descriptor_Pointer=0),
  S2MM_CURDESC_MSB = Register(Current_Descriptor_Pointer=0),
  S2MM_TAILDESC = Register(Tail_Descriptor_Pointer=0),
  S2MM_TAILDESC_MSB = Register(Tail_Descriptor_Pointer=0),
  S2MM_DA = Register(Destination_Address=377790464),
  S2MM_DA_MSB = Register(Destination_Address=0),
  S2MM_LENGTH = Register(Length=4000)
}

As an example, we can compare the buffer (physical) addresses to the DMA source and destination addresses as shown in the register map.

print("Input buffer address   :", hex(input_buffer.physical_address))
print("Output buffer address  :", hex(output_buffer.physical_address))
print("---")
print("DMA Source address     :", hex(dma.register_map.MM2S_SA.Source_Address))
print("DMA Destination address:", hex(dma.register_map.S2MM_DA.Destination_Address))
Input buffer address   : 0x16849000
Output buffer address  : 0x1684a000
---
DMA Source address     : 0x16849000
DMA Destination address: 0x1684a000

Free all the memory buffers

Don’t forget to free the memory buffers to avoid memory leaks!

del input_buffer, output_buffer

References

8 Likes

DMA expects any streaming IP connected to the DMA (write channel) to set the AXI TLAST signal when the transaction is complete. If this is not set, the DMA will never complete the transaction. This is important when using HLS to generate the IP - the TLAST signal must be set in the C code.

MyCardStatement

1 Like

This is correct, but this comment isn’t really relevant for this part of this tutorial. There is a separate tutorial on (using a HLS stream IP with DMA)[Tutorial: using a HLS stream IP with DMA (Part 1: HLS design)] which covers TLAST.

TLAST was also mentioned when discussing the hardware design in the first part of this tutorial: PYNQ DMA tutorial (Part 1: Hardware design).

Cathal

1 Like

A post was split to a new topic: Initialize a vector for DMA transfer

5 posts were split to a new topic: DMA fails after second read

Excuse me, I want to ask about the problem “RuntimeError: DMA channel not idle”, If I accidentally turn on dma_recv first, how do I stop it? I read the dma.py, tried to use the stop function, but I got an error message: “<bound method _SDMAChannel.stop of <pynq.lib.dma._SDMAChannel object at 0xaf5c5730>>”.

2 Likes

is there a function to read the value written to the DMA after doing the dma.send operation, to check what is the data saved in DMA ?

2 Likes