14#define ASSERT_MPI_SUCCESS(ierr) \
19#ifdef UTPIPELINE_UNIT_TEST
27#ifdef UTPIPELINE_UNIT_TEST
31 include 'Flashx_mpi.h'
81 subroutine UTPipeline_init(itemSize, maxItems, channelSize, comm, numChannels, procList, logUnit)
83 integer,
intent(IN) :: itemSize, maxItems, channelSize, comm, numChannels
84 integer, dimension(:),
intent(IN) :: procList
85 integer,
optional,
intent(IN) :: logUnit
96 if (
present(logUnit))
then
101 if (numChannels
> 0)
then
121 ASSERT_MPI_SUCCESS(ierr)
123 ASSERT_MPI_SUCCESS(ierr)
158 logical,
optional,
intent(IN) :: doAsyncReturn
160 logical :: doSyncReturn
166 ASSERT_MPI_SUCCESS(ierr)
171 ASSERT_MPI_SUCCESS(ierr)
178 ASSERT_MPI_SUCCESS(ierr)
183 ASSERT_MPI_SUCCESS(ierr)
189 if (
present(doAsyncReturn))
then
190 doSyncReturn
= .not.doAsyncReturn
192 doSyncReturn
= .true.
194 if (doSyncReturn)
then
196 ASSERT_MPI_SUCCESS(ierr)
231 integer :: outcount, index, ierr, i, msgLen, procID
244 ASSERT_MPI_SUCCESS(ierr)
254 ASSERT_MPI_SUCCESS(ierr)
292 integer,
parameter :: notFound
= -1
293 integer :: fullestChannel, bufSize, i
297 fullestChannel
= notFound
321 integer,
intent(IN) :: index
322 integer :: procID, msgSize, ierr
325 if (msgSize
>= 0)
then
329 msgSize,
' to ', procID
334 ASSERT_MPI_SUCCESS(ierr)
342 integer,
intent(IN) :: index
343 integer :: procID, ierr
352 ASSERT_MPI_SUCCESS(ierr)
359 logical,
optional,
intent(IN) :: doFlush
365 if (
present(doFlush))
then
376 integer :: outcount, index, ierr, i
384 ASSERT_MPI_SUCCESS(ierr)
414 logical,
intent(OUT) :: isClosing
446 logical,
intent(OUT) :: isCommDone
461 logical,
intent(OUT) :: isDone
462 logical :: isCommDone
471 integer,
intent(OUT) :: numItems
479 logical,
intent(OUT) :: isEmpty
480 logical :: isCommBufEmpty
486 isCommBufEmpty
= .true.
494 real, dimension(:,:),
intent(INOUT) :: userArray
495 integer,
intent(IN) :: userMaxCount
496 integer,
intent(INOUT) :: userCount
497 integer :: freeSpace, itemsToCopy, firstItem
499 freeSpace
= userMaxCount
- userCount
501 if (itemsToCopy
> 0)
then
507 firstItem, firstItem
+itemsToCopy
-1,
' to user slice ',
&
508 userCount
+1, userCount
+itemsToCopy
511 userArray(:,userCount
+1:userCount
+itemsToCopy)
= &
515 userCount
= userCount
+ itemsToCopy
527 integer,
intent(IN) :: index
528 logical,
intent(OUT) :: isSaved
532 if (numItems
> 0)
then
536 write(
utpipe_logUnit,
'(a,i6,2(a,2(i6)))')
'Handle receive from ',
&
557 real, dimension(:),
intent(IN) :: item
558 integer,
intent(IN) :: procID
559 logical,
intent(OUT) :: isHandled
560 integer :: channel, ptr, i
561 integer,
parameter :: notFound
= -1
572 if (channel
== notFound)
call Driver_abort(
"Msg channel not found")
604 subroutine readOnlyFn(item, itemDescription)
606 real, dimension(:),
intent(IN) :: item
607 character(len
=*),
intent(IN) :: itemDescription
608 end subroutine readOnlyFn
611 character(
len=100) :: itemDescription
614 write (itemDescription,
'(a,i10)')
'itemBuf: item ', i
626 write (itemDescription,
'(2(a,i10))')
'sendBuf: channel ', n,
&
641 write (itemDescription,
'(2(a,i10))')
'recvBuf: channel ', n,
&
650#ifdef UTPIPELINE_UNIT_TEST
653 integer,
intent(IN) :: errorCode
654 if (errorCode
/= MPI_SUCCESS)
call Driver_abort(
'Error in MPI')
659 character (len
=*),
intent(IN) :: msg
660 print *,
"ERROR!!! ", msg
subroutine Driver_abort(errorMessage)
Subroutine Driver_checkMPIErrorCode(errorCode)
subroutine utpipe_saveRecvItems(index, isSaved)
integer, parameter OPEN_STATE
integer, parameter FLASH_REAL
integer, save utpipe_logUnit
subroutine utpipe_postRecvMsg(index)
subroutine UTPipeline_numItems(numItems)
integer, parameter CLOSE_STATE
real, dimension(:,:,:), allocatable, save utpipe_sendBuf
logical, save utpipe_isCommInitialized
subroutine UTPipeline_closeSendChannels(isClosing)
integer, save utpipe_channelSize
logical, save utpipe_isCommDone
integer, save utpipe_comm
logical, save utpipe_isRecvCommDone
subroutine UTPipeline_progressRecvComm
subroutine UTPipeline_init(itemSize, maxItems, channelSize, comm, numChannels, procList, logUnit)
logical, save utpipe_isInitialized
integer, save utpipe_numChannels
integer, dimension(:), allocatable, save utpipe_sendState
integer, dimension(:), allocatable, save utpipe_recvCount
integer, dimension(:), allocatable, save utpipe_recvIndex
integer, dimension(:), allocatable, save utpipe_sendRequest
subroutine UTPipeline_isEmpty(isEmpty)
subroutine UTPipeline_iterateItems(readOnlyFn)
subroutine utpipe_handleOldRecvMsg()
integer, dimension(:), allocatable, save utpipe_procList
integer, save utpipe_itemCount
integer, parameter FLASH_INTEGER
subroutine UTPipeline_getItems(userArray, userMaxCount, userCount)
real, dimension(:,:,:), allocatable, save utpipe_recvBuf
subroutine utpipe_postSendMsg(index)
integer, save utpipe_size
subroutine UTPipeline_progressComm(doFlush)
subroutine UTPipeline_isCommDone(isCommDone)
subroutine UTPipeline_finalizeComm(doAsyncReturn)
subroutine UTPipeline_initComm()
integer, parameter WAITING_TO_CLOSE_STATE
integer, save utpipe_rank
subroutine UTPipeline_isDone(isDone)
subroutine UTPipeline_sendItem(item, procID, isHandled)
logical, save utpipe_doLog
integer, parameter PROMISE_TO_CLOSE_STATE
subroutine utpipe_progressClosePromise()
subroutine UTPipeline_finalize
subroutine UTPipeline_sendFullestChannel
integer, dimension(:), allocatable, save utpipe_sendIndex
integer, parameter utpipe_tag
integer, dimension(:), allocatable, save utpipe_sendCount
integer, save utpipe_itemSize
logical, save utpipe_isSendCommDone
subroutine UTPipeline_progressSendComm()
integer, dimension(:), allocatable, save utpipe_recvRequest
real, dimension(:,:), allocatable, save utpipe_itemBuf
integer, dimension(:,:), allocatable, save utpipe_sendStatus
integer, dimension(:,:), allocatable, save utpipe_recvStatus
integer, save utpipe_maxItems