@@ -361,26 +361,36 @@ FIO_STATIC struct ioengine_ops ioengine_client = {
361361
362362#define IO_U_BUFF_OFF_SERVER (i ) (i * IO_U_BUF_LEN)
363363
364+ static int server_cmpl_poll (struct thread_data * td ,
365+ struct rpma_cq * cq , struct ibv_wc * wc );
366+ static int server_cmpl_wait_and_poll (struct thread_data * td ,
367+ struct rpma_cq * cq , struct ibv_wc * wc );
368+ static int (* server_cmpl_process_func )(struct thread_data * td ,
369+ struct rpma_cq * cq , struct ibv_wc * wc );
370+
364371struct server_data {
365372 /* aligned td->orig_buffer */
366373 char * orig_buffer_aligned ;
367374
368375 /* resources for messaging buffer from DRAM allocated by fio */
369376 struct rpma_mr_local * msg_mr ;
370377
371- uint32_t msg_sqe_available ; /* # of free SQ slots */
372-
373- /* in-memory queues */
374- struct ibv_wc * msgs_queued ;
375- uint32_t msg_queued_nr ;
378+ /* # of free SQ slots */
379+ uint32_t msg_sqe_available ;
380+ /* receive CQ */
381+ struct rpma_cq * rcq ;
376382};
377383
378384static int server_init (struct thread_data * td )
379385{
386+ struct librpma_fio_options_values * o = td -> eo ;
380387 struct librpma_fio_server_data * csd ;
381388 struct server_data * sd ;
382389 int ret = -1 ;
383390
391+ server_cmpl_process_func = o -> busy_wait_polling ? server_cmpl_poll :
392+ server_cmpl_wait_and_poll ;
393+
384394 if ((ret = librpma_fio_server_init (td )))
385395 return ret ;
386396
@@ -393,13 +403,6 @@ static int server_init(struct thread_data *td)
393403 goto err_server_cleanup ;
394404 }
395405
396- /* allocate in-memory queue */
397- sd -> msgs_queued = calloc (td -> o .iodepth , sizeof (* sd -> msgs_queued ));
398- if (sd -> msgs_queued == NULL ) {
399- td_verror (td , errno , "calloc" );
400- goto err_free_sd ;
401- }
402-
403406 /*
404407 * Assure a single io_u buffer can store both SEND and RECV messages and
405408 * an io_us buffer allocation is page-size-aligned which is required
@@ -412,9 +415,6 @@ static int server_init(struct thread_data *td)
412415
413416 return 0 ;
414417
415- err_free_sd :
416- free (sd );
417-
418418err_server_cleanup :
419419 librpma_fio_server_cleanup (td );
420420
@@ -485,7 +485,6 @@ static void server_cleanup(struct thread_data *td)
485485 if ((ret = rpma_mr_dereg (& sd -> msg_mr )))
486486 librpma_td_verror (td , ret , "rpma_mr_dereg" );
487487
488- free (sd -> msgs_queued );
489488 free (sd );
490489 }
491490
@@ -518,6 +517,7 @@ static int prepare_connection(struct thread_data *td,
518517static int server_open_file (struct thread_data * td , struct fio_file * f )
519518{
520519 struct librpma_fio_server_data * csd = td -> io_ops_data ;
520+ struct server_data * sd = csd -> server_data ;
521521 struct rpma_conn_cfg * cfg = NULL ;
522522 uint16_t max_msg_num = td -> o .iodepth ;
523523 int ret ;
@@ -531,13 +531,15 @@ static int server_open_file(struct thread_data *td, struct fio_file *f)
531531 }
532532
533533 /*
534- * Calculate the required queue sizes where :
534+ * The required queue sizes are :
535535 * - the send queue (SQ) has to be big enough to accommodate
536536 * all possible flush requests (SENDs)
537537 * - the receive queue (RQ) has to be big enough to accommodate
538538 * all flush responses (RECVs)
539- * - the completion queue (CQ) has to be big enough to accommodate
540- * all success and error completions (sq_size + rq_size)
539+ * - the main completion queue (CQ) has to be big enough to
540+ * accommodate all success and error completions (sq_size)
541+ * - the receive completion queue (RCQ) has to be big enough to
542+ * accommodate all success and error completions (rq_size)
541543 */
542544 if ((ret = rpma_conn_cfg_set_sq_size (cfg , max_msg_num ))) {
543545 librpma_td_verror (td , ret , "rpma_conn_cfg_set_sq_size" );
@@ -547,12 +549,21 @@ static int server_open_file(struct thread_data *td, struct fio_file *f)
547549 librpma_td_verror (td , ret , "rpma_conn_cfg_set_rq_size" );
548550 goto err_cfg_delete ;
549551 }
550- if ((ret = rpma_conn_cfg_set_cq_size (cfg , max_msg_num * 2 ))) {
552+ if ((ret = rpma_conn_cfg_set_cq_size (cfg , max_msg_num ))) {
551553 librpma_td_verror (td , ret , "rpma_conn_cfg_set_cq_size" );
552554 goto err_cfg_delete ;
553555 }
556+ if ((ret = rpma_conn_cfg_set_rcq_size (cfg , max_msg_num ))) {
557+ librpma_td_verror (td , ret , "rpma_conn_cfg_set_rcq_size" );
558+ goto err_cfg_delete ;
559+ }
554560
555- ret = librpma_fio_server_open_file (td , f , cfg );
561+ if ((ret = librpma_fio_server_open_file (td , f , cfg )))
562+ goto err_cfg_delete ;
563+
564+ /* get the connection's receive CQ */
565+ if ((ret = rpma_conn_get_rcq (csd -> conn , & sd -> rcq )))
566+ librpma_td_verror (td , ret , "rpma_conn_get_rcq" );
556567
557568err_cfg_delete :
558569 (void ) rpma_conn_cfg_delete (& cfg );
@@ -645,65 +656,26 @@ static int server_qe_process(struct thread_data *td, struct ibv_wc *wc)
645656 return -1 ;
646657}
647658
648- static inline int server_queue_process (struct thread_data * td )
659+ /*
660+ * server_cmpl_poll - poll and process a completion
661+ *
662+ * Return value:
663+ * 0 or 1 - number of received completions
664+ * -1 - in case of an error
665+ */
666+ static int server_cmpl_poll (struct thread_data * td , struct rpma_cq * cq ,
667+ struct ibv_wc * wc )
649668{
650669 struct librpma_fio_server_data * csd = td -> io_ops_data ;
651670 struct server_data * sd = csd -> server_data ;
652671 int ret ;
653- int i ;
654672
655- /* min(# of queue entries, # of SQ entries available) */
656- uint32_t qes_to_process = min ( sd -> msg_queued_nr , sd -> msg_sqe_available );
657- if ( qes_to_process == 0 )
673+ ret = rpma_cq_get_wc ( cq , 1 , wc , NULL );
674+ if ( ret == RPMA_E_NO_COMPLETION ) {
675+ /* lack of completion is not an error */
658676 return 0 ;
659-
660- /* process queued completions */
661- for (i = 0 ; i < qes_to_process ; ++ i ) {
662- if ((ret = server_qe_process (td , & sd -> msgs_queued [i ])))
663- return ret ;
664- }
665-
666- /* progress the queue */
667- for (i = 0 ; i < sd -> msg_queued_nr - qes_to_process ; ++ i ) {
668- memcpy (& sd -> msgs_queued [i ],
669- & sd -> msgs_queued [qes_to_process + i ],
670- sizeof (sd -> msgs_queued [i ]));
671677 }
672-
673- sd -> msg_queued_nr -= qes_to_process ;
674-
675- return 0 ;
676- }
677-
678- static int server_cmpl_process (struct thread_data * td )
679- {
680- struct librpma_fio_server_data * csd = td -> io_ops_data ;
681- struct server_data * sd = csd -> server_data ;
682- struct ibv_wc * wc = & sd -> msgs_queued [sd -> msg_queued_nr ];
683- struct librpma_fio_options_values * o = td -> eo ;
684- int ret ;
685-
686- ret = rpma_cq_get_wc (csd -> cq , 1 , wc , NULL );
687- if (ret == RPMA_E_NO_COMPLETION ) {
688- if (o -> busy_wait_polling )
689- return 0 ; /* lack of completion is not an error */
690-
691- ret = rpma_cq_wait (csd -> cq );
692- if (ret == RPMA_E_NO_COMPLETION )
693- return 0 ; /* lack of completion is not an error */
694- if (ret ) {
695- librpma_td_verror (td , ret , "rpma_cq_wait" );
696- goto err_terminate ;
697- }
698-
699- ret = rpma_cq_get_wc (csd -> cq , 1 , wc , NULL );
700- if (ret == RPMA_E_NO_COMPLETION )
701- return 0 ; /* lack of completion is not an error */
702- if (ret ) {
703- librpma_td_verror (td , ret , "rpma_cq_get_wc" );
704- goto err_terminate ;
705- }
706- } else if (ret ) {
678+ if (ret ) {
707679 librpma_td_verror (td , ret , "rpma_cq_get_wc" );
708680 goto err_terminate ;
709681 }
@@ -712,25 +684,82 @@ static int server_cmpl_process(struct thread_data *td)
712684 if (wc -> status != IBV_WC_SUCCESS )
713685 goto err_terminate ;
714686
715- if (wc -> opcode == IBV_WC_RECV )
716- ++ sd -> msg_queued_nr ;
717- else if (wc -> opcode == IBV_WC_SEND )
687+ if (wc -> opcode == IBV_WC_SEND )
718688 ++ sd -> msg_sqe_available ;
719689
720- return 0 ;
690+ return 1 ;
721691
722692err_terminate :
723693 td -> terminate = true;
724694
725695 return -1 ;
726696}
727697
698+ /*
699+ * server_cmpl_wait_and_poll - wait, poll and process a completion
700+ *
701+ * Return value:
702+ * 0 or 1 - number of received completions
703+ * -1 - in case of an error
704+ */
705+ static int server_cmpl_wait_and_poll (struct thread_data * td , struct rpma_cq * cq ,
706+ struct ibv_wc * wc )
707+ {
708+ int ret ;
709+
710+ ret = server_cmpl_poll (td , cq , wc );
711+ if (ret )
712+ return ret ;
713+
714+ /* lack of completion */
715+ ret = rpma_cq_wait (cq );
716+ if (ret == RPMA_E_NO_COMPLETION ) {
717+ /* lack of completion is not an error */
718+ return 0 ;
719+ }
720+ if (ret ) {
721+ librpma_td_verror (td , ret , "rpma_cq_wait" );
722+ td -> terminate = true;
723+ return -1 ;
724+ }
725+
726+ return server_cmpl_poll (td , cq , wc );
727+ }
728+
729+ static inline int server_queue_process (struct thread_data * td )
730+ {
731+ struct librpma_fio_server_data * csd = td -> io_ops_data ;
732+ struct server_data * sd = csd -> server_data ;
733+ struct ibv_wc cq_wc , rcq_wc ;
734+ int ret ;
735+
736+ /* process as many send completions as possible */
737+ while ((ret = server_cmpl_poll (td , csd -> cq , & cq_wc ))) {
738+ if (ret < 0 )
739+ return ret ;
740+ }
741+
742+ /* process the receive completion */
743+ ret = server_cmpl_process_func (td , sd -> rcq , & rcq_wc );
744+ if (ret != 1 )
745+ return ret ;
746+
747+ /* ret == 1 means rcq_wc.opcode == IBV_WC_RECV */
748+
749+ /* ensure that at least one SQ slot is available */
750+ while (sd -> msg_sqe_available == 0 ) {
751+ /* process the send completion */
752+ ret = server_cmpl_process_func (td , csd -> cq , & cq_wc );
753+ if (ret < 0 )
754+ return ret ;
755+ }
756+
757+ return server_qe_process (td , & rcq_wc );
758+ }
759+
728760static enum fio_q_status server_queue (struct thread_data * td , struct io_u * io_u )
729761{
730762 do {
731- if (server_cmpl_process (td ))
732- return FIO_Q_BUSY ;
733-
734763 if (server_queue_process (td ))
735764 return FIO_Q_BUSY ;
736765
0 commit comments