In [1]:
import numpy as np
from struct import pack, unpack
from typing import List
from pynq import Overlay, DefaultIP, allocate
from pynq.lib.video import VideoMode

In [2]:
def convert(a, total_bits, frac_bits): 
    mask1 = 1 << (total_bits - 1) 
    mask2 = mask1 - 1 
    return ((a & mask2) - (a & mask1)) / (1 << frac_bits)

def pack_float(f):
    return unpack('I', pack('f', f))[0]

class Preprocess(DefaultIP):
    bindto = ['xilinx.com:hls:preprocess_accel:1.0',
              'xilinx.com:hls:preprocess2_accel:1.0',
              'xilinx.com:hls:preprocess3_accel:1.0']
    def __init__(self, description):
        super().__init__(description)
        
    def config(self, rows:int, cols:int, means:List[float], stds:List[float], buffer_dtype=np.float32):
        r_mean, g_mean, b_mean = means
        r_std, g_std, b_std = stds
        
        # Configure IP for processing
        self.write(28, rows)
        self.write(36, cols)
        self.write(44, pack_float(r_mean))
        self.write(52, pack_float(g_mean))
        self.write(60, pack_float(b_mean))
        self.write(68, pack_float(r_std))
        self.write(76, pack_float(g_std))
        self.write(88, pack_float(b_std))
       
        # Allocate buffer to retrieve output
        self.output_buffer = allocate(shape=(rows, cols, 4), dtype=buffer_dtype)
        self.write(16, self.output_buffer.device_address)
    
    def results(self):
        self.output_buffer.sync_from_device()
        return self.output_buffer #.copy()
    
    def start(self):
        self.write(0, 0x81)
        
    def stop(self):
        self.write(0, 0)
        self.output_buffer.freebuffer()

In [3]:
ol = Overlay('preprocess8.bit')

In [4]:
ol.ip_dict

{'axi_intc_0': {'fullpath': 'axi_intc_0',
  'type': 'xilinx.com:ip:axi_intc:4.1',
  'bdtype': None,
  'state': None,
  'addr_range': 65536,
  'phys_addr': 2147483648,
  'mem_id': 's_axi',
  'memtype': 'REGISTER',
  'gpio': {},
  'interrupts': {},
  'parameters': {'C_FAMILY': 'zynquplus',
   'C_INSTANCE': 'design_1_axi_intc_0_0',
   'C_S_AXI_ADDR_WIDTH': '9',
   'C_S_AXI_DATA_WIDTH': '32',
   'C_NUM_INTR_INPUTS': '2',
   'C_NUM_SW_INTR': '0',
   'C_KIND_OF_INTR': '0xfffffffc',
   'C_KIND_OF_EDGE': '0xFFFFFFFF',
   'C_KIND_OF_LVL': '0xFFFFFFFF',
   'C_ASYNC_INTR': '0xFFFFFFFC',
   'C_NUM_SYNC_FF': '2',
   'C_ADDR_WIDTH': '32',
   'C_IVAR_RESET_VALUE': '0x0000000000000010',
   'C_ENABLE_ASYNC': '0',
   'C_HAS_IPR': '1',
   'C_HAS_SIE': '1',
   'C_HAS_CIE': '1',
   'C_HAS_IVR': '1',
   'C_HAS_ILR': '0',
   'C_IRQ_IS_LEVEL': '1',
   'C_IRQ_ACTIVE': '0x1',
   'C_DISABLE_SYNCHRONIZERS': '0',
   'C_MB_CLK_NOT_CONNECTED': '1',
   'C_HAS_FAST': '0',
   'C_EN_CASCADE_MODE': '0',
   'C_CASCADE_MAS

In [5]:
input_w = 5 # cols
input_h = 5 # rows

In [6]:
# Configure VDMA
vdma = ol.vdma
vdma_in = vdma.writechannel
vdma_in.mode = VideoMode(input_w, input_h, 24)

In [7]:
# Configure preprocessing IP
preprocess = ol.preprocess3_accel_0

means = np.array([0.0, 0.0, 0.0])
stds = np.array([1.0, 1.0, 1.0])

preprocess.config(input_h, input_w, means, stds)
# preprocess.config(input_h, input_w, means, stds, buffer_dtype=np.uint32)

preprocess.write(4, 0) # Enable interrupt

In [8]:
# Start IPs
vdma_in.start()
preprocess.start()

In [9]:
bin(preprocess.read(0))

'0b10000011'

In [10]:
input_data = np.arange(0, input_h*input_w*3, dtype=np.uint8).reshape(input_h, input_w, 3)
input_data

array([[[ 0,  1,  2],
        [ 3,  4,  5],
        [ 6,  7,  8],
        [ 9, 10, 11],
        [12, 13, 14]],

       [[15, 16, 17],
        [18, 19, 20],
        [21, 22, 23],
        [24, 25, 26],
        [27, 28, 29]],

       [[30, 31, 32],
        [33, 34, 35],
        [36, 37, 38],
        [39, 40, 41],
        [42, 43, 44]],

       [[45, 46, 47],
        [48, 49, 50],
        [51, 52, 53],
        [54, 55, 56],
        [57, 58, 59]],

       [[60, 61, 62],
        [63, 64, 65],
        [66, 67, 68],
        [69, 70, 71],
        [72, 73, 74]]], dtype=uint8)

In [11]:
# Send data to VDMA
in_frame = vdma_in.newframe()
in_frame[:, :, :] = input_data 
vdma_in.writeframe(in_frame)

In [12]:
out_data = preprocess.results()
# Convert fixed point to float
# out_data = convert(out_data, 16, 8)
out_data

PynqBuffer([[[34., 16.,  0.,  0.],
             [31., 25.,  0.,  0.],
             [63., 57.,  0.,  0.],
             [28., 22.,  0.,  0.],
             [69., 61.,  0.,  0.]],

            [[37., 22.,  0.,  0.],
             [69., 61.,  0.,  0.],
             [34., 19.,  0.,  0.],
             [66., 48.,  0.,  0.],
             [31., 25.,  0.,  0.]],

            [[66.,  1.,  0.,  0.],
             [69., 29.,  0.,  0.],
             [60., 54.,  0.,  0.],
             [ 3., 70.,  0.,  0.],
             [50., 44.,  0.,  0.]],

            [[34., 13.,  0.,  0.],
             [66., 48.,  0.,  0.],
             [15., 10.,  0.,  0.],
             [40., 25.,  0.,  0.],
             [63., 48.,  0.,  0.]],

            [[59., 51.,  0.,  0.],
             [15., 10.,  0.,  0.],
             [63., 32.,  0.,  0.],
             [72., 57.,  0.,  0.],
             [21., 13.,  0.,  0.]]], dtype=float32)

In [13]:
out_data

PynqBuffer([[[50., 73.,  0.,  0.],
             [72., 22.,  0.,  0.],
             [12., 73.,  0.,  0.],
             [37., 22.,  0.,  0.],
             [60., 54.,  0.,  0.]],

            [[12., 64.,  0.,  0.],
             [28., 22.,  0.,  0.],
             [60., 45.,  0.,  0.],
             [18., 13.,  0.,  0.],
             [50., 44.,  0.,  0.]],

            [[ 3., 61.,  0.,  0.],
             [34., 19.,  0.,  0.],
             [66., 48.,  0.,  0.],
             [24., 10.,  0.,  0.],
             [56., 48.,  0.,  0.]],

            [[ 0., 67.,  0.,  0.],
             [12.,  4.,  0.,  0.],
             [43., 35.,  0.,  0.],
             [ 0.,  1.,  0.,  0.],
             [47., 32.,  0.,  0.]],

            [[59., 35.,  0.,  0.],
             [ 9.,  1.,  0.,  0.],
             [40., 32.,  0.,  0.],
             [ 6., 73.,  0.,  0.],
             [37., 29.,  0.,  0.]]], dtype=float32)

In [14]:
preprocess.stop()
vdma_in.stop()
del ol

### Debugging helper functions

In [None]:
from struct import pack, unpack

In [None]:
def float_to_hex(f):
    return hex(unpack('I', pack('f', f))[0]) 

def hex_to_float(h):
    return unpack('f', pack('I', h))[0]