飛凌嵌入式iMX8MP 開發板試用體驗--編譯內核源碼&編譯DPDK源碼實現rte_ring無鎖環隊列進程間通信

原創 作者 donatello1996 2021-11-23 14:19:00 iMX8MP

來源: 電子發燒友   作者:donatello1996


FETMX8MP-C核心板基于NXP i.MX 8M Plus處理器開發設計,該系列處理器專注于機器學習與視覺、高級多媒體以及具有高可靠性的工業自動化。旨在滿足智慧城市工業互聯網、智能醫療智慧交通等應用的需求。 ·強大的四核或雙核Arm? Cortex?-A53處理器,主頻高達1.6GHz,帶有神經處理 單元(NPU),最高運行速率可達2.3 TOPS。

iMX8MP核心板

本文主要介紹編譯內核源碼&編譯DPDK源碼實現rte_ring無鎖環隊列進程間通信。


正文開始:


下載并解壓飛凌廠商提供的iMX8MP內核源碼壓縮包分卷:


在虛擬機中合并壓縮分卷并解壓得出內核源碼包文件夾OK8MP-linux-kernel,將文件夾使用tar打包并復制到TF卡文件系統中解壓:


找到內核源碼中的配置文件OK8MP-C_defconfig:



這個就是make選項,使用

make OK8MP-C_defconfig

指令即可配置編譯選項:

make -j4

開始編譯:

注意開始編譯前需要安裝常用軟件:

apt install bison bc flex




增量編譯完畢:



然后接下來就可以下載DPDK并運行rte_ring無鎖環隊列Demo應用,需要從

https://www.dpdk.org/

官網中下載DPDK 19.11.10 (LTS)長期支持版本:


在根目錄下的mk/文件夾下找到名為rte_vars.mk設置文件,找到環境變量RTE_KERNELDIR,修改為上述的內核源碼路徑:

1.RTE_KERNELDIR ?= /home/OK8MP-linux-kernel/

進入usertools文件夾,找到dpdk-setup.sh腳本并運行

選擇8,ARM64-armv8a-linuxapp-gcc,

這個選項會使dpdk的gcc交叉編譯鏈生成適用于armv8a處理器的外部庫,外部庫中有kmod和lib等ko文件和so文件,

是用于第三方程序開發和運行的:

使用指令

insmod /home/dpdk-stable-19.11.10/arm64-armv8a-linuxapp-gcc/kmod/igb_uio.ko

加載igb_uio.ko驅動文件,這是進行dpdk開發必備的步驟:

然后是使用dpdk-devbind.py腳本手動進行hugepage大頁內存綁定,此處為numa方式:

此舉會將/mnt/huge文件mount成hugepage映射文件,并實實在在地占用內存空間:



準備工作完成,我們接下來可以進行rte_ring無鎖環隊列Demo代碼的編寫,但是在編寫之前,需要對無鎖環隊列有一個基本的認識:https://blog.csdn.net/chen98765432101/article/details/69367633
無論是dpdk第三方開發的rte_ring還是Linux內核中本就存在的無鎖環隊列,其基本原理類似,在一條分配好的隊列型內存空間中,讀寫方式為FIFO(先進先出),讀和寫的動作分別有兩個進程或兩個線程進行,寫進程不斷往地址自增的內存位置寫入數據,讀進程不斷讀取地址自增的內存位置的數據,當寫位置的內存地址已為隊列中內存的最高值時,需要釋放隊列中內存地址最低值的空間供寫進程繼續寫,方式仍與上一周期相同(不斷往地址自增的內存位置寫入數據),釋放過程需要保證對末尾內存地址空間的鎖定與解鎖,避免讀寫過程出錯。而不同的是,Linux內核中的無鎖環隊列,地址管理和讀寫控制均由內核進行,而dpdk的rte_ring則由dpdk內部的控制器進行,因為dpdk這一模塊完整地接管了所分配內存空間的管理權,是直接繞過Linux內核進行管理的,內核也無權訪問dpdk控制器的具體管理細節。

編寫無鎖環隊列兩個進程的Demo,先寫Primary進程:


#include <stdio.h>

#include <string.h>

#include <stdint.h>

#include <stdlib.h>

#include <inttypes.h>

#include <stdarg.h>

#include <errno.h>

#include <unistd.h>

#include <termios.h>

#include <sys/queue.h>


#include <rte_mempool.h>

#include <rte_lcore.h>


#define RTE_LOGTYPE_APP RTE_LOGTYPE_USER1


static const char *_MSG_POOL = "MSG_POOL";

static const char *_SEC_2_PRI = "SEC_2_PRI";

static const char *_PRI_2_SEC = "PRI_2_SEC";

static const char *_PRI_2_THI = "PRI_2_THI";


struct rte_ring *send_ring, *recv_ring , *send_ring_third;

struct rte_mempool *message_pool;

volatile int quit = 0;


static void * lcore_recv(void *arg)

{

unsigned lcore_id = rte_lcore_id();


printf("Starting core %u\n", lcore_id);

while (!quit){

void *msg;

if (rte_ring_dequeue(recv_ring, &msg) < 0)

{

usleep(5);

continue;

}

printf("lcore_id = %d Received '%s'\n" , lcore_id , (char *)msg);

rte_mempool_put(message_pool , msg);

}


return 0;

}


int string_size = 100;

int elt_size = 128;

pthread_t id1;


int main(int argc, char **argv)

{

const unsigned flags = 0;

const unsigned ring_size = 64;

const unsigned pool_size = 1024;

const unsigned pool_cache = 32;

const unsigned priv_data_sz = 0;


int ret;

unsigned lcore_id;


ret = rte_eal_init(argc, argv);

if (ret < 0)

rte_exit(EXIT_FAILURE, "Cannot init EAL\n");


send_ring = rte_ring_create(_PRI_2_SEC, ring_size, rte_socket_id(), flags);

recv_ring = rte_ring_create(_SEC_2_PRI, ring_size, rte_socket_id(), flags);

send_ring_third = rte_ring_create(_PRI_2_THI, ring_size, rte_socket_id(), flags);

message_pool = rte_mempool_create(_MSG_POOL, pool_size,

elt_size, pool_cache, priv_data_sz,

NULL, NULL, NULL, NULL,

rte_socket_id(), flags);


if (send_ring == NULL)

rte_exit(EXIT_FAILURE, "Problem getting sending ring\n");


if (recv_ring == NULL)

rte_exit(EXIT_FAILURE, "Problem getting receiving ring\n");


if (send_ring_third == NULL)

rte_exit(EXIT_FAILURE, "Problem getting send_ring_third\n");


if (message_pool == NULL)

rte_exit(EXIT_FAILURE, "Problem getting message pool\n");


pthread_create(&id1 , NULL , lcore_recv , NULL);

while(1)

{

void *msg = NULL;

if (rte_mempool_get(message_pool, &msg) < 0)

continue;


snprintf((char *)msg, string_size, "%s", "primary to secondary");

if (rte_ring_enqueue(send_ring , msg) < 0)

{

rte_mempool_put(message_pool, msg);

}


if (rte_mempool_get(message_pool, &msg) < 0)

continue;


snprintf((char *)msg, string_size, "%s", "primary to third");

if (rte_ring_enqueue(send_ring_third , msg) < 0)

{

rte_mempool_put(message_pool, msg);

}


sleep(1);

}


return 0;

}


注意在Makefile文件里面要關閉WERROR相關編譯選項:

#   BSD LICENSE

#

#   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.

#   All rights reserved.

#

#   Redistribution and use in source and binary forms, with or without

#   modification, are permitted provided that the following conditions

#   are met:

#

#     * Redistributions of source code must retain the above copyright

#       notice, this list of conditions and the following disclaimer.

#     * Redistributions in binary form must reproduce the above copyright

#       notice, this list of conditions and the following disclaimer in

#       the documentation and/or other materials provided with the

#       distribution.

#     * Neither the name of Intel Corporation nor the names of its

#       contributors may be used to endorse or promote products derived

#       from this software without specific prior written permission.

#

#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS

#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT

#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR

#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT

#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,

#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT

#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,

#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY

#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT

#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE

#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


ifeq ($(RTE_SDK),)

$(error "Please define RTE_SDK environment variable")

endif


# Default target, can be overridden by command line or environment

RTE_TARGET ?= arm64-armv8a-linuxapp-gcc


include $(RTE_SDK)/mk/rte.vars.mk


# binary name

APP = rte_ring_primary


# all source are stored in SRCS-y

SRCS-y := main.c


CFLAGS += -O0

CFLAGS +=


include $(RTE_SDK)/mk/rte.extapp.mk


Secondary進程:

#include <stdio.h>

#include <string.h>

#include <stdint.h>

#include <stdlib.h>

#include <inttypes.h>

#include <stdarg.h>

#include <errno.h>

#include <unistd.h>

#include <termios.h>

#include <sys/queue.h>


#include <rte_mempool.h>

#include <rte_lcore.h>


#define RTE_LOGTYPE_APP RTE_LOGTYPE_USER1


static const char *_MSG_POOL = "MSG_POOL";

static const char *_SEC_2_PRI = "SEC_2_PRI";

static const char *_PRI_2_SEC = "PRI_2_SEC";


struct rte_ring *send_ring, *recv_ring;

struct rte_mempool *message_pool;

volatile int quit = 0;

int string_size = 100;


static int lcore_send(__attribute__((unused)) void *arg)

{

unsigned lcore_id = rte_lcore_id();


while(1)

{

void *msg = NULL;

if (rte_mempool_get(message_pool, &msg) < 0)

continue;


snprintf((char *)msg , string_size , "%s", "secondary to primary");

if (rte_ring_enqueue(send_ring , msg) < 0)

{

rte_mempool_put(message_pool, msg);

}

sleep(1);

}

return 0;

}


pthread_t id1;


int main(int argc, char **argv)

{

const unsigned flags = 0;

const unsigned ring_size = 64;

const unsigned pool_size = 1024;

const unsigned pool_cache = 32;

const unsigned priv_data_sz = 0;


int ret;

unsigned lcore_id;

ret = rte_eal_init(argc, argv);

if (ret < 0)

rte_exit(EXIT_FAILURE, "Cannot init EAL\n");


recv_ring = rte_ring_lookup(_PRI_2_SEC);

send_ring = rte_ring_lookup(_SEC_2_PRI);

message_pool = rte_mempool_lookup(_MSG_POOL);


if (send_ring == NULL)

rte_exit(EXIT_FAILURE, "Problem getting sending ring\n");

if (recv_ring == NULL)

rte_exit(EXIT_FAILURE, "Problem getting receiving ring\n");

if (message_pool == NULL)

rte_exit(EXIT_FAILURE, "Problem getting message pool\n");


pthread_create(&id1 , NULL , lcore_send , NULL);

while (1)

{

lcore_id = rte_lcore_id();

void * msg = NULL;

if (rte_ring_dequeue(recv_ring, &msg) < 0)

{

usleep(5);

continue;

}


printf("lcore_id = %d Received: %s\n" , lcore_id , (char *)msg);


rte_mempool_put(message_pool, msg);

}


return 0;

}


同樣在Makefile文件里面要關閉WERROR相關編譯選項:

#   BSD LICENSE

#

#   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.

#   All rights reserved.

#

#   Redistribution and use in source and binary forms, with or without

#   modification, are permitted provided that the following conditions

#   are met:

#

#     * Redistributions of source code must retain the above copyright

#       notice, this list of conditions and the following disclaimer.

#     * Redistributions in binary form must reproduce the above copyright

#       notice, this list of conditions and the following disclaimer in

#       the documentation and/or other materials provided with the

#       distribution.

#     * Neither the name of Intel Corporation nor the names of its

#       contributors may be used to endorse or promote products derived

#       from this software without specific prior written permission.

#

#   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS

#   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT

#   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR

#   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT

#   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,

#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT

#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,

#   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY

#   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT

#   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE

#   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


ifeq ($(RTE_SDK),)

$(error "Please define RTE_SDK environment variable")

endif


# Default target, can be overridden by command line or environment

RTE_TARGET ?= arm64-armv8a-linuxapp-gcc


include $(RTE_SDK)/mk/rte.vars.mk


# binary name

APP = rte_ring_secondary


# all source are stored in SRCS-y

SRCS-y := main.c


CFLAGS += -O3

CFLAGS += $()


include $(RTE_SDK)/mk/rte.extapp.mk


運行,這里說一下,基于rte_ring的進程間通信,Secondary進程最好是使用auto類型:

./rte_ring_primary --proc-type primary

./rte_ring_secondary --proc-type auto

運行效果:

相關產品 >

  • OKMX8MP-C開發板

    內置NPU、ISP,AI計算能力高達2.3TOPS|飛凌嵌入式i.MX8MP 系列-NXP iMX8M Plus 開發板 基于高性能低功耗工業級iMX8MP核心板設計,支持多種多種高速通信接口。iMX8MP開發板內置NPU,AI計算能力2.3TOPS,支持4K,支持雙圖像信號處理器(ISP),是一款支持LinuxQT/android操作系統的iMX8MP開發板。

    了解詳情
    OKMX8MP-C開發板
  • FETMX8MP-C核心板

    iMX8MP核心板基于 NXP  i.MX 8M Plus 處理器設計,  采用4核Cortex-A53 和 Cortex-M7架構。支持雙千兆網口,iMX8MP性能強勁最高運行速率可達2.3TOPS,并且i.MX8MP功耗更低≤2W 。iMX 8M Plus系列專注于機器學習和視覺、高級多媒體以及具有高可靠性的工業自動化。它旨在滿足智慧家庭、樓宇、城市和工業4.0應用的需求。飛凌iMX8MP核心板提供用戶手冊,iMX8MP原理圖,引腳定義等。
    了解詳情
    FETMX8MP-C核心板

推薦閱讀 換一批 換一批