@@ -368,11 +368,8 @@ struct server_data {
368368 /* resources for messaging buffer from DRAM allocated by fio */
369369 struct rpma_mr_local * msg_mr ;
370370
371- uint32_t msg_sqe_available ; /* # of free SQ slots */
372-
373- /* in-memory queues */
374- struct ibv_wc * msgs_queued ;
375- uint32_t msg_queued_nr ;
371+ /* receive CQ */
372+ struct rpma_cq * rcq ;
376373};
377374
378375static int server_init (struct thread_data * td )
@@ -393,13 +390,6 @@ static int server_init(struct thread_data *td)
393390 goto err_server_cleanup ;
394391 }
395392
396- /* allocate in-memory queue */
397- sd -> msgs_queued = calloc (td -> o .iodepth , sizeof (* sd -> msgs_queued ));
398- if (sd -> msgs_queued == NULL ) {
399- td_verror (td , errno , "calloc" );
400- goto err_free_sd ;
401- }
402-
403393 /*
404394 * Assure a single io_u buffer can store both SEND and RECV messages and
405395 * an io_us buffer allocation is page-size-aligned which is required
@@ -412,9 +402,6 @@ static int server_init(struct thread_data *td)
412402
413403 return 0 ;
414404
415- err_free_sd :
416- free (sd );
417-
418405err_server_cleanup :
419406 librpma_fio_server_cleanup (td );
420407
@@ -485,7 +472,6 @@ static void server_cleanup(struct thread_data *td)
485472 if ((ret = rpma_mr_dereg (& sd -> msg_mr )))
486473 librpma_td_verror (td , ret , "rpma_mr_dereg" );
487474
488- free (sd -> msgs_queued );
489475 free (sd );
490476 }
491477
@@ -501,7 +487,6 @@ static int prepare_connection(struct thread_data *td,
501487 int i ;
502488
503489 /* prepare buffers for a flush requests */
504- sd -> msg_sqe_available = td -> o .iodepth ;
505490 for (i = 0 ; i < td -> o .iodepth ; i ++ ) {
506491 size_t offset_recv_msg = IO_U_BUFF_OFF_SERVER (i ) + RECV_OFFSET ;
507492 if ((ret = rpma_conn_req_recv (conn_req , sd -> msg_mr ,
@@ -518,6 +503,7 @@ static int prepare_connection(struct thread_data *td,
518503static int server_open_file (struct thread_data * td , struct fio_file * f )
519504{
520505 struct librpma_fio_server_data * csd = td -> io_ops_data ;
506+ struct server_data * sd = csd -> server_data ;
521507 struct rpma_conn_cfg * cfg = NULL ;
522508 uint16_t max_msg_num = td -> o .iodepth ;
523509 int ret ;
@@ -536,8 +522,10 @@ static int server_open_file(struct thread_data *td, struct fio_file *f)
536522 * all possible flush requests (SENDs)
537523 * - the receive queue (RQ) has to be big enough to accommodate
538524 * all flush responses (RECVs)
539- * - the completion queue (CQ) has to be big enough to accommodate
540- * all success and error completions (sq_size + rq_size)
525+ * - the main completion queue (CQ) has to be big enough to
526+ * accommodate all success and error completions (sq_size)
527+ * - the receive completion queue (RCQ) has to be big enough to
528+ * accommodate all success and error completions (rq_size)
541529 */
542530 if ((ret = rpma_conn_cfg_set_sq_size (cfg , max_msg_num ))) {
543531 librpma_td_verror (td , ret , "rpma_conn_cfg_set_sq_size" );
@@ -547,12 +535,24 @@ static int server_open_file(struct thread_data *td, struct fio_file *f)
547535 librpma_td_verror (td , ret , "rpma_conn_cfg_set_rq_size" );
548536 goto err_cfg_delete ;
549537 }
550- if ((ret = rpma_conn_cfg_set_cq_size (cfg , max_msg_num * 2 ))) {
538+ if ((ret = rpma_conn_cfg_set_cq_size (cfg , max_msg_num ))) {
551539 librpma_td_verror (td , ret , "rpma_conn_cfg_set_cq_size" );
552540 goto err_cfg_delete ;
553541 }
554542
555- ret = librpma_fio_server_open_file (td , f , cfg );
543+ if ((ret = rpma_conn_cfg_set_rcq_size (cfg , max_msg_num ))) {
544+ librpma_td_verror (td , ret , "rpma_conn_cfg_set_rcq_size" );
545+ goto err_cfg_delete ;
546+ }
547+
548+ if ((ret = librpma_fio_server_open_file (td , f , cfg ))) {
549+ librpma_td_verror (td , ret , "librpma_fio_server_open_file" );
550+ goto err_cfg_delete ;
551+ }
552+
553+ /* get the connection's receive CQ */
554+ if ((ret = rpma_conn_get_rcq (csd -> conn , & sd -> rcq )))
555+ librpma_td_verror (td , ret , "rpma_conn_get_rcq" );
556556
557557err_cfg_delete :
558558 (void ) rpma_conn_cfg_delete (& cfg );
@@ -630,7 +630,6 @@ static int server_qe_process(struct thread_data *td, struct ibv_wc *wc)
630630 librpma_td_verror (td , ret , "rpma_send" );
631631 goto err_free_unpacked ;
632632 }
633- -- sd -> msg_sqe_available ;
634633
635634 gpspm_flush_request__free_unpacked (flush_req , NULL );
636635
@@ -645,48 +644,17 @@ static int server_qe_process(struct thread_data *td, struct ibv_wc *wc)
645644 return -1 ;
646645}
647646
648- static inline int server_queue_process (struct thread_data * td )
649- {
650- struct librpma_fio_server_data * csd = td -> io_ops_data ;
651- struct server_data * sd = csd -> server_data ;
652- int ret ;
653- int i ;
654-
655- /* min(# of queue entries, # of SQ entries available) */
656- uint32_t qes_to_process = min (sd -> msg_queued_nr , sd -> msg_sqe_available );
657- if (qes_to_process == 0 )
658- return 0 ;
659-
660- /* process queued completions */
661- for (i = 0 ; i < qes_to_process ; ++ i ) {
662- if ((ret = server_qe_process (td , & sd -> msgs_queued [i ])))
663- return ret ;
664- }
665-
666- /* progress the queue */
667- for (i = 0 ; i < sd -> msg_queued_nr - qes_to_process ; ++ i ) {
668- memcpy (& sd -> msgs_queued [i ],
669- & sd -> msgs_queued [qes_to_process + i ],
670- sizeof (sd -> msgs_queued [i ]));
671- }
672-
673- sd -> msg_queued_nr -= qes_to_process ;
674-
675- return 0 ;
676- }
677-
678- static int server_cmpl_process (struct thread_data * td )
647+ static int server_cmpl_process (struct thread_data * td , struct rpma_cq * cq , struct ibv_wc * wc )
679648{
680- struct librpma_fio_server_data * csd = td -> io_ops_data ;
681- struct server_data * sd = csd -> server_data ;
682- struct ibv_wc * wc = & sd -> msgs_queued [sd -> msg_queued_nr ];
683649 struct librpma_fio_options_values * o = td -> eo ;
684650 int ret ;
685651
686- ret = rpma_cq_get_wc (csd -> cq , 1 , wc , NULL );
652+ memset (wc , 0 , sizeof (struct ibv_wc ));
653+
654+ ret = rpma_cq_get_wc (cq , 1 , wc , NULL );
687655 if (ret == RPMA_E_NO_COMPLETION ) {
688656 if (o -> busy_wait_polling == 0 ) {
689- ret = rpma_cq_wait (csd -> cq );
657+ ret = rpma_cq_wait (cq );
690658 if (ret == RPMA_E_NO_COMPLETION ) {
691659 /* lack of completion is not an error */
692660 return 0 ;
@@ -695,7 +663,7 @@ static int server_cmpl_process(struct thread_data *td)
695663 goto err_terminate ;
696664 }
697665
698- ret = rpma_cq_get_wc (csd -> cq , 1 , wc , NULL );
666+ ret = rpma_cq_get_wc (cq , 1 , wc , NULL );
699667 if (ret == RPMA_E_NO_COMPLETION ) {
700668 /* lack of completion is not an error */
701669 return 0 ;
@@ -716,10 +684,8 @@ static int server_cmpl_process(struct thread_data *td)
716684 if (wc -> status != IBV_WC_SUCCESS )
717685 goto err_terminate ;
718686
719- if (wc -> opcode == IBV_WC_RECV )
720- ++ sd -> msg_queued_nr ;
721- else if (wc -> opcode == IBV_WC_SEND )
722- ++ sd -> msg_sqe_available ;
687+ if (wc -> opcode != IBV_WC_RECV && wc -> opcode != IBV_WC_SEND )
688+ goto err_terminate ;
723689
724690 return 0 ;
725691
@@ -729,12 +695,35 @@ static int server_cmpl_process(struct thread_data *td)
729695 return -1 ;
730696}
731697
698+ static inline int server_queue_process (struct thread_data * td )
699+ {
700+ struct librpma_fio_server_data * csd = td -> io_ops_data ;
701+ struct server_data * sd = csd -> server_data ;
702+ struct ibv_wc wc = {0 };
703+ int ret ;
704+
705+ /* process the receive completion */
706+ ret = server_cmpl_process (td , sd -> rcq , & wc );
707+ if (ret )
708+ return ret ;
709+
710+ if (wc .opcode == IBV_WC_RECV ) {
711+ ret = server_qe_process (td , & wc );
712+ if (ret )
713+ return ret ;
714+ }
715+
716+ /* process the send completion */
717+ ret = server_cmpl_process (td , csd -> cq , & wc );
718+ if (ret )
719+ return ret ;
720+
721+ return 0 ;
722+ }
723+
732724static enum fio_q_status server_queue (struct thread_data * td , struct io_u * io_u )
733725{
734726 do {
735- if (server_cmpl_process (td ))
736- return FIO_Q_BUSY ;
737-
738727 if (server_queue_process (td ))
739728 return FIO_Q_BUSY ;
740729
0 commit comments