FLASH-X
Doxygen Generated Documentation From Interface Source Code
UTPipeline.F90
Go to the documentation of this file.
1#define ASSERT(x) \
2!! NOTICE
3!! Copyright 2022 UChicago Argonne, LLC and contributors
4!!
5!! Licensed under the Apache License, Version 2.0 (the "License");
6!! you may not use this file except in compliance with the License.
7!!
8!! Unless required by applicable law or agreed to in writing, software
9!! distributed under the License is distributed on an "AS IS" BASIS,
10!! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11!! See the License for the specific language governing permissions and
12!! limitations under the License.
13 if (.not.x) call Driver_abort("Assertion fail")
14#define ASSERT_MPI_SUCCESS(ierr) \
16
18
19#ifdef UTPIPELINE_UNIT_TEST
20 use mpi
21#else
23#endif
24
25 implicit none
26
27#ifdef UTPIPELINE_UNIT_TEST
28 integer, parameter :: FLASH_INTEGER = MPI_INTEGER
29 integer, parameter :: FLASH_REAL = MPI_DOUBLE_PRECISION !WARNING MUST PROMOTE REALS TO DP
30#else
31 include 'Flashx_mpi.h'
32#endif
33
34 real, allocatable, save, dimension(:,:,:) :: utpipe_sendBuf
35 real, allocatable, save, dimension(:,:,:) :: utpipe_recvBuf
36 real, allocatable, save, dimension(:,:) :: utpipe_itemBuf
37
38 integer, parameter :: utpipe_tag = 1235
39 integer, allocatable, save, dimension(:,:) :: utpipe_recvStatus
40 integer, allocatable, save, dimension(:) :: utpipe_recvRequest
41 integer, allocatable, save, dimension(:) :: utpipe_recvIndex
42 integer, allocatable, save, dimension(:) :: utpipe_recvCount
43
44 integer, allocatable, save, dimension(:,:) :: utpipe_sendStatus
45 integer, allocatable, save, dimension(:) :: utpipe_sendRequest
46 integer, allocatable, save, dimension(:) :: utpipe_sendIndex
47 integer, allocatable, save, dimension(:) :: utpipe_sendCount
48
49 integer, allocatable, save, dimension(:) :: utpipe_procList
50 integer, save :: utpipe_itemCount
51
52 integer, save :: utpipe_comm
53 integer, save :: utpipe_size
54 integer, save :: utpipe_rank
55
56 integer, save :: utpipe_itemSize
57 integer, save :: utpipe_maxItems
58 integer, save :: utpipe_channelSize
59 integer, save :: utpipe_numChannels
60
61 logical, save :: utpipe_isSendCommDone
62 logical, save :: utpipe_isRecvCommDone
63
64 logical, save :: utpipe_isInitialized = .false.
65 logical, save :: utpipe_isCommInitialized = .false.
66 logical, save :: utpipe_isCommDone = .false.
67
68 integer, save :: utpipe_logUnit
69 logical, save :: utpipe_doLog
70
71 integer, allocatable, save, dimension(:) :: utpipe_sendState
72 integer, parameter :: OPEN_STATE = -3000
73 integer, parameter :: PROMISE_TO_CLOSE_STATE = -4000
74 integer, parameter :: WAITING_TO_CLOSE_STATE = -5000
75 integer, parameter :: CLOSE_STATE = -6000
76
77contains
78
79 !Needs to be called whenever the FLASH grid changes or whenever
80 !the pipeline code is used for another purpose.
81 subroutine UTPipeline_init(itemSize, maxItems, channelSize, comm, numChannels, procList, logUnit)
82 implicit none
83 integer, intent(IN) :: itemSize, maxItems, channelSize, comm, numChannels
84 integer, dimension(:), intent(IN) :: procList
85 integer, optional, intent(IN) :: logUnit
86 integer :: ierr
87
88 utpipe_itemSize = itemSize
89 utpipe_maxItems = maxItems
90 utpipe_channelSize = channelSize
91 utpipe_comm = comm
92 utpipe_numChannels = numChannels
93
95 utpipe_doLog = .false.
96 if (present(logUnit)) then
97 utpipe_doLog = (logUnit >= 0) !Must be a valid Fortran unit
98 if (utpipe_doLog) utpipe_logUnit = logUnit
99 end if
100
101 if (numChannels > 0) then
102 allocate(utpipe_itemBuf(itemSize,maxItems))
103 allocate(utpipe_sendBuf(itemSize,channelSize,numChannels))
104 allocate(utpipe_sendStatus(MPI_STATUS_SIZE,numChannels))
105 allocate(utpipe_sendRequest(numChannels))
106 allocate(utpipe_sendIndex(numChannels))
107 allocate(utpipe_sendCount(numChannels))
108 allocate(utpipe_sendState(numChannels))
109 allocate(utpipe_recvBuf(itemSize,channelSize,numChannels))
110 allocate(utpipe_recvStatus(MPI_STATUS_SIZE,numChannels))
111 allocate(utpipe_recvRequest(numChannels))
112 allocate(utpipe_recvIndex(numChannels))
113 allocate(utpipe_recvCount(numChannels))
114 allocate(utpipe_procList(numChannels))
115 utpipe_procList(1:numChannels) = procList(1:numChannels)
116 utpipe_sendRequest = MPI_REQUEST_NULL
117 utpipe_recvRequest = MPI_REQUEST_NULL
118 end if
119
120 call MPI_Comm_rank(utpipe_comm, utpipe_rank, ierr)
121 ASSERT_MPI_SUCCESS(ierr)
122 call MPI_Comm_size(utpipe_comm, utpipe_size, ierr)
123 ASSERT_MPI_SUCCESS(ierr)
124
125 utpipe_isInitialized = .true.
126 end subroutine UTPipeline_init
127
128
130 implicit none
131 integer :: i
132 if (.not.utpipe_isInitialized) then
133 call Driver_abort("Must intialize the pipeline first")
134 end if
135
137 if (utpipe_numChannels > 0) then
138 utpipe_sendCount(:) = 0
139 utpipe_recvCount(:) = 0
140 utpipe_isCommDone = .false.
141 utpipe_isSendCommDone = .false.
142 utpipe_isRecvCommDone = .false.
143 do i = 1, utpipe_numChannels
145 call utpipe_postRecvMsg(i)
146 end do
147 else
148 utpipe_isCommDone = .true.
149 utpipe_isSendCommDone = .true.
150 utpipe_isRecvCommDone = .true.
151 end if
153 end subroutine UTPipeline_initComm
154
155
156 subroutine UTPipeline_finalizeComm(doAsyncReturn)
157 implicit none
158 logical, optional, intent(IN) :: doAsyncReturn
159 integer :: i, ierr
160 logical :: doSyncReturn
161
162 if (utpipe_numChannels > 0) then
163 do i = 1, utpipe_numChannels
164 if (utpipe_recvRequest(i) /= MPI_REQUEST_NULL) then
165 call MPI_Cancel(utpipe_recvRequest(i), ierr)
166 ASSERT_MPI_SUCCESS(ierr)
167 end if
168 end do
169 call MPI_Waitall(utpipe_numChannels, utpipe_recvRequest, &
170 utpipe_recvStatus, ierr)
171 ASSERT_MPI_SUCCESS(ierr)
172 utpipe_isRecvCommDone = .true.
173
174
175 do i = 1, utpipe_numChannels
176 if (utpipe_sendRequest(i) /= MPI_REQUEST_NULL) then
177 call MPI_Cancel(utpipe_sendRequest(i), ierr)
178 ASSERT_MPI_SUCCESS(ierr)
179 end if
180 end do
181 call MPI_Waitall(utpipe_numChannels, utpipe_sendRequest, &
182 utpipe_sendStatus, ierr)
183 ASSERT_MPI_SUCCESS(ierr)
185 utpipe_isSendCommDone = .true.
186 end if
187
188 if (utpipe_size > 1) then
189 if (present(doAsyncReturn)) then
190 doSyncReturn = .not.doAsyncReturn
191 else
192 doSyncReturn = .true.
193 end if
194 if (doSyncReturn) then
195 call MPI_Barrier(utpipe_comm, ierr)
196 ASSERT_MPI_SUCCESS(ierr)
197 end if
198 end if
199 utpipe_isCommDone = .true.
201 end subroutine UTPipeline_finalizeComm
202
203
205 implicit none
207 call UTPipeline_finalizeComm(doAsyncReturn=.true.)
208 end if
209
210 if (utpipe_isInitialized .and. utpipe_numChannels > 0) then
211 deallocate(utpipe_itemBuf)
212 deallocate(utpipe_sendBuf)
213 deallocate(utpipe_sendStatus)
214 deallocate(utpipe_sendRequest)
215 deallocate(utpipe_sendIndex)
216 deallocate(utpipe_sendCount)
217 deallocate(utpipe_sendState)
218 deallocate(utpipe_recvBuf)
219 deallocate(utpipe_recvStatus)
220 deallocate(utpipe_recvRequest)
221 deallocate(utpipe_recvIndex)
222 deallocate(utpipe_recvCount)
223 deallocate(utpipe_procList)
224 utpipe_isInitialized = .false.
225 end if
226 end subroutine UTPipeline_finalize
227
228
230 implicit none
231 integer :: outcount, index, ierr, i, msgLen, procID
232 logical :: isSaved
233
234 if (utpipe_numChannels > 0 .and. .not.utpipe_isRecvCommDone) then
235 !Copy old items from receive channels in which there are no pending MPI
236 !receives. This only happens when there was previously insufficient
237 !space in utpipe_itemBuf.
239
240 !Test all receive channels for new messages. Save the corresponding
241 !items and then post a new receive.
242 call MPI_Testsome(utpipe_NumChannels, utpipe_recvRequest, outcount, &
244 ASSERT_MPI_SUCCESS(ierr)
245
246 do i = 1, outcount
247 index = utpipe_recvIndex(i)
248 procID = utpipe_recvStatus(MPI_SOURCE,i)
249 if (procID /= utpipe_procList(index)) then
250 call Driver_abort("ProcID mismatch")
251 end if
252
253 call MPI_Get_count(utpipe_recvStatus(:,i), FLASH_REAL, msgLen, ierr)
254 ASSERT_MPI_SUCCESS(ierr)
255 utpipe_recvCount(index) = msgLen / utpipe_itemSize
256 if (utpipe_doLog) then
257 write(utpipe_logUnit,'(2(a,i6))') 'Received ', &
258 utpipe_recvCount(index), ' elements from ', procID
259 end if
260
261 !A zero-byte message means the send channel is closed.
262 if (msgLen > 0) then
263 call utpipe_saveRecvItems(index, isSaved)
264 if (isSaved) call utpipe_postRecvMsg(index)
265 end if
266 end do
267
268 !Check for completion
270 all(utpipe_recvRequest == MPI_REQUEST_NULL .and. &
271 utpipe_recvCount == 0)
272 end if
273 end subroutine UTPipeline_progressRecvComm
274
275
277 implicit none
278 integer :: i
279 logical :: isSaved
280 do i = 1, utpipe_numChannels
281 if ( utpipe_recvRequest(i) == MPI_REQUEST_NULL .and. &
282 utpipe_recvCount(i) > 0 ) then
283 call utpipe_saveRecvItems(i, isSaved)
284 if (isSaved) call utpipe_postRecvMsg(i)
285 end if
286 end do
287 end subroutine utpipe_handleOldRecvMsg
288
289
291 implicit none
292 integer, parameter :: notFound = -1 !This value must be negative
293 integer :: fullestChannel, bufSize, i
294
295 if (utpipe_numChannels > 0) then
296 if (any(utpipe_sendCount(:) > 0)) then
297 fullestChannel = notFound
298 bufSize = notFound
299 do i = 1, utpipe_numChannels
300 !Test for data that is not currently being sent
301 if ( utpipe_sendState(i) == OPEN_STATE .and. &
302 utpipe_sendRequest(i) == MPI_REQUEST_NULL .and. &
303 utpipe_sendCount(i) > 0 ) then
304 if (utpipe_sendCount(i) > bufSize) then
305 fullestChannel = i
306 bufSize = utpipe_sendCount(i)
307 end if
308 end if
309 end do
310 !It is possible that there are no sends meeting the above criteria
311 if (fullestChannel >= 1 .and. fullestChannel <= utpipe_numChannels) then
312 call utpipe_postSendMsg(fullestChannel)
313 end if
314 end if
315 end if
316 end subroutine UTPipeline_sendFullestChannel
317
318
319 subroutine utpipe_postSendMsg(index)
320 implicit none
321 integer, intent(IN) :: index
322 integer :: procID, msgSize, ierr
323
324 msgSize = utpipe_sendCount(index)
325 if (msgSize >= 0) then
326 procID = utpipe_procList(index)
327 if (utpipe_doLog) then
328 write(utpipe_logUnit,'(2(a,i6))') 'Post send msg of ', &
329 msgSize, ' to ', procID
330 end if
331 call MPI_Isend(utpipe_sendBuf(1,1,index), utpipe_itemSize*msgSize, &
333 utpipe_sendRequest(index), ierr)
334 ASSERT_MPI_SUCCESS(ierr)
335 end if
336 utpipe_isSendCommDone = .false.
337 end subroutine utpipe_postSendMsg
338
339
340 subroutine utpipe_postRecvMsg(index)
341 implicit none
342 integer, intent(IN) :: index
343 integer :: procID, ierr
344
345 procID = utpipe_procList(index)
346 if (utpipe_doLog) then
347 write(utpipe_logUnit,'(a,i6)') 'Post receive msg from ', procID
348 end if
349 call MPI_Irecv(utpipe_recvBuf(1,1,index), &
352 ASSERT_MPI_SUCCESS(ierr)
353 utpipe_isRecvCommDone = .false.
354 end subroutine utpipe_postRecvMsg
355
356
357 subroutine UTPipeline_progressComm(doFlush)
358 implicit none
359 logical, optional, intent(IN) :: doFlush
362
363 !The following code guarantees global progress. It will normally
364 !be executed when we are processing the last few items
365 if (present(doFlush)) then
366 if (doFlush .and. utpipe_itemCount == 0) then
368 end if
369 end if
370 end subroutine UTPipeline_progressComm
371
372
373 !Rename UTPipeline_progressSendComm
375 implicit none
376 integer :: outcount, index, ierr, i
377
378 if (utpipe_numChannels > 0 .and. .not.utpipe_isSendCommDone) then
379
381
382 call MPI_Testsome(utpipe_NumChannels, utpipe_sendRequest, &
383 outcount, utpipe_sendIndex, utpipe_sendStatus, ierr)
384 ASSERT_MPI_SUCCESS(ierr)
385
386 !Note that status objects are only meaningful for receive messages.
387 do i = 1, outcount
388 index = utpipe_sendIndex(i)
389
390 utpipe_sendCount(index) = 0
391 if (utpipe_sendState(index) == WAITING_TO_CLOSE_STATE) then
393 end if
394 if (utpipe_doLog) then
395 write(utpipe_logUnit,'(a,i6)') 'Completed send msg to ', &
396 utpipe_procList(index)
397 end if
398 end do
399
400 !Check for completion
402 if (utpipe_isSendCommDone .and. &
403 any(utpipe_sendRequest /= MPI_REQUEST_NULL .or. &
404 utpipe_sendCount /= 0)) then
405 call Driver_abort('Bad shutdown')
406 end if
407 end if
408 end subroutine UTPipeline_progressSendComm
409
410
411 !Promise to close the send channels.
412 subroutine UTPipeline_closeSendChannels(isClosing)
413 implicit none
414 logical, intent(OUT) :: isClosing
415 integer :: i
416 if (utpipe_numChannels > 0) then
417 do i = 1, utpipe_numChannels
418 if (utpipe_sendState(i) == OPEN_STATE) then
420 end if
421 end do
423 end if
424 isClosing = .true.
425 end subroutine UTPipeline_closeSendChannels
426
427
428 !Fulfill the close promise by sending a zero-byte notification message
430 implicit none
431 integer :: i
432 do i = 1, utpipe_numChannels
434 utpipe_sendRequest(i) == MPI_REQUEST_NULL ) then
435 utpipe_sendCount(i) = 0 !For a zero-byte message
436 call utpipe_postSendMsg(i)
438 end if
439 end do
440 end subroutine utpipe_progressClosePromise
441
442
443 !Call this after UTPipeline_closeSendChannels
444 subroutine UTPipeline_isCommDone(isCommDone)
445 implicit none
446 logical, intent(OUT) :: isCommDone
447
448 if (.not.utpipe_isCommDone) then
450 call UTPipeline_finalizeComm(doAsyncReturn=.true.)
451 else
453 end if
454 end if
455 isCommDone = utpipe_isCommDone
456 end subroutine UTPipeline_isCommDone
457
458
459 subroutine UTPipeline_isDone(isDone)
460 implicit none
461 logical, intent(OUT) :: isDone
462 logical :: isCommDone
463
464 call UTPipeline_isCommDone(isCommDone)
465 isDone = isCommDone .and. utpipe_itemCount == 0
466 end subroutine UTPipeline_isDone
467
468
469 subroutine UTPipeline_numItems(numItems)
470 implicit none
471 integer, intent(OUT) :: numItems
472 numItems = utpipe_itemCount
473 end subroutine UTPipeline_numItems
474
475
476 !Remove this subroutine
477 subroutine UTPipeline_isEmpty(isEmpty)
478 implicit none
479 logical, intent(OUT) :: isEmpty
480 logical :: isCommBufEmpty
481
482 if (utpipe_numChannels > 0) then
483 isCommBufEmpty = &
484 all(utpipe_sendCount(:) == 0 .and. utpipe_recvCount(:) == 0)
485 else
486 isCommBufEmpty = .true.
487 end if
488 isEmpty = (utpipe_itemCount == 0 .and. isCommBufEmpty)
489 end subroutine UTPipeline_isEmpty
490
491
492 subroutine UTPipeline_getItems(userArray, userMaxCount, userCount)
493 implicit none
494 real, dimension(:,:), intent(INOUT) :: userArray
495 integer, intent(IN) :: userMaxCount
496 integer, intent(INOUT) :: userCount
497 integer :: freeSpace, itemsToCopy, firstItem
498
499 freeSpace = userMaxCount - userCount
500 itemsToCopy = min(utpipe_itemCount, freeSpace)
501 if (itemsToCopy > 0) then
502 !Copy from the end of utpipe_itemBuf to allow for a fast memcpy
503 firstItem = utpipe_itemCount - itemsToCopy + 1
504
505 if (utpipe_doLog) then
506 write(utpipe_logUnit,'(2(a,2(i6)))') 'Copy from buf slice ', &
507 firstItem, firstItem+itemsToCopy-1, ' to user slice ', &
508 userCount+1, userCount+itemsToCopy
509 end if
510
511 userArray(:,userCount+1:userCount+itemsToCopy) = &
512 utpipe_itemBuf(:,firstItem:firstItem+itemsToCopy-1)
513
514 utpipe_itemCount = utpipe_itemCount - itemsToCopy
515 userCount = userCount + itemsToCopy
516
517 if (utpipe_doLog) then
518 write(utpipe_logUnit,'(2(a,i6))') 'Elements in user array ', &
519 userCount, ' Elements in buf ', utpipe_itemCount
520 end if
521 end if
522 end subroutine UTPipeline_getItems
523
524
525 subroutine utpipe_saveRecvItems(index, isSaved)
526 implicit none
527 integer, intent(IN) :: index
528 logical, intent(OUT) :: isSaved
529 integer :: numItems
530
531 numItems = utpipe_recvCount(index)
532 if (numItems > 0) then
533 if (utpipe_itemCount + numItems <= utpipe_maxItems) then
534
535 if (utpipe_doLog) then
536 write(utpipe_logUnit,'(a,i6,2(a,2(i6)))') 'Handle receive from ', &
537 utpipe_procList(index), ' copy from msg slice ', &
538 1, numItems, ' to buf slice ', utpipe_itemCount+1, &
539 utpipe_itemCount+numItems
540 end if
541
543 utpipe_recvBuf(:,1:numItems,index)
545 utpipe_recvCount(index) = 0
546 isSaved = .true.
547 else
548 isSaved = .false.
549 end if
550 end if
551 end subroutine utpipe_saveRecvItems
552
553
554 !Caller should probably add the following: if (isHandled) item = NONEXISTENT
555 subroutine UTPipeline_sendItem(item, procID, isHandled)
556 implicit none
557 real, dimension(:), intent(IN) :: item
558 integer, intent(IN) :: procID
559 logical, intent(OUT) :: isHandled
560 integer :: channel, ptr, i
561 integer, parameter :: notFound = -1
562
563 !It may be necessary to change the utpipe_procList data structure
564 !to make the lookup faster.
565 channel = notFound
566 do i = 1, utpipe_numChannels
567 if (utpipe_procList(i) == procID) then
568 channel = i
569 exit
570 end if
571 end do
572 if (channel == notFound) call Driver_abort("Msg channel not found")
573
574 !If there is a pending send in our desired channel we test all
575 !send channels. Request values are reset to MPI_REQUEST_NULL when
576 !sends complete.
577 if (utpipe_sendRequest(channel) /= MPI_REQUEST_NULL) then
579 end if
580
581 !We can safetly add items to the send buffer if there is no pending send.
582 if ( utpipe_sendState(channel) == OPEN_STATE .and. &
583 utpipe_sendRequest(channel) == MPI_REQUEST_NULL ) then
584 ptr = utpipe_sendCount(channel) + 1
585 if (ptr > utpipe_channelSize) call Driver_abort("Counting error")
586 utpipe_sendBuf(:,ptr,channel) = item(:)
587 utpipe_sendCount(channel) = ptr !Array is needed in utpipe_postSendMsg
588
589 if (utpipe_sendCount(channel) == utpipe_channelSize) then
590 call utpipe_postSendMsg(channel)
591 end if
592 isHandled = .true.
593 else
594 isHandled = .false.
595 end if
596 end subroutine UTPipeline_sendItem
597
598
599 !We allow the caller to see the internal state of the pipeline
600 !message exchange.
601 subroutine UTPipeline_iterateItems(readOnlyFn)
602 implicit none
603 interface
604 subroutine readOnlyFn(item, itemDescription)
605 implicit none
606 real, dimension(:), intent(IN) :: item
607 character(len=*), intent(IN) :: itemDescription
608 end subroutine readOnlyFn
609 end interface
610 integer :: i, n
611 character(len=100) :: itemDescription
612
613 do i = 1, utpipe_itemCount
614 write (itemDescription,'(a,i10)') 'itemBuf: item ', i
615 call readOnlyFn(utpipe_itemBuf(:,i), trim(itemDescription))
616 end do
617
618 !"The sender should not modify any part of the send buffer after a
619 !nonblocking send operation is called, until the send completes."
620 ![MPI-3 3.7.2]. "(the send operation itself leaves the content of
621 !the send buffer unchanged)" [MPI-3 3.7.3]
622 !... it should therefore always be OK to read what is there.
623 do n = 1, utpipe_numChannels
624 if (utpipe_sendCount(n) > 0) then
625 do i = 1, utpipe_sendCount(n)
626 write (itemDescription,'(2(a,i10))') 'sendBuf: channel ', n, &
627 & ', item ', i
628 call readOnlyFn(utpipe_sendBuf(:,i,n), trim(itemDescription))
629 end do
630 end if
631 end do
632
633 !"The receiver should not access any part of the receive buffer
634 !after a nonblocking receive operation is called, until the
635 !receive completes." [MPI-3 3.7.2].
636 !... it is not OK to read what is there.
637 do n = 1, utpipe_numChannels
638 if ( utpipe_recvCount(n) > 0 .and. &
639 utpipe_recvRequest(n) == MPI_REQUEST_NULL ) then
640 do i = 1, utpipe_recvCount(n)
641 write (itemDescription,'(2(a,i10))') 'recvBuf: channel ', n, &
642 & ', item ', i
643 call readOnlyFn(utpipe_recvBuf(:,i,n), trim(itemDescription))
644 end do
645 end if
646 end do
647 end subroutine UTPipeline_iterateItems
648
649
650#ifdef UTPIPELINE_UNIT_TEST
651 subroutine Driver_checkMPIErrorCode(errorCode)
652 implicit none
653 integer, intent(IN) :: errorCode
654 if (errorCode /= MPI_SUCCESS) call Driver_abort('Error in MPI')
655 end subroutine Driver_checkMPIErrorCode
656
657 subroutine Driver_abort(msg)
658 implicit none
659 character (len=*), intent(IN) :: msg
660 print *, "ERROR!!! ", msg
661 stop
662 end subroutine Driver_abort
663#endif
664
665end module UTPipeline
subroutine Driver_abort(errorMessage)
Subroutine Driver_checkMPIErrorCode(errorCode)
subroutine utpipe_saveRecvItems(index, isSaved)
Definition: UTPipeline.F90:526
integer, parameter OPEN_STATE
Definition: UTPipeline.F90:72
integer, parameter FLASH_REAL
Definition: UTPipeline.F90:29
integer, save utpipe_logUnit
Definition: UTPipeline.F90:68
subroutine utpipe_postRecvMsg(index)
Definition: UTPipeline.F90:341
subroutine UTPipeline_numItems(numItems)
Definition: UTPipeline.F90:470
integer, parameter CLOSE_STATE
Definition: UTPipeline.F90:75
real, dimension(:,:,:), allocatable, save utpipe_sendBuf
Definition: UTPipeline.F90:34
logical, save utpipe_isCommInitialized
Definition: UTPipeline.F90:65
subroutine UTPipeline_closeSendChannels(isClosing)
Definition: UTPipeline.F90:413
integer, save utpipe_channelSize
Definition: UTPipeline.F90:58
logical, save utpipe_isCommDone
Definition: UTPipeline.F90:66
integer, save utpipe_comm
Definition: UTPipeline.F90:52
logical, save utpipe_isRecvCommDone
Definition: UTPipeline.F90:62
subroutine UTPipeline_progressRecvComm
Definition: UTPipeline.F90:230
subroutine UTPipeline_init(itemSize, maxItems, channelSize, comm, numChannels, procList, logUnit)
Definition: UTPipeline.F90:82
logical, save utpipe_isInitialized
Definition: UTPipeline.F90:64
integer, save utpipe_numChannels
Definition: UTPipeline.F90:59
integer, dimension(:), allocatable, save utpipe_sendState
Definition: UTPipeline.F90:71
integer, dimension(:), allocatable, save utpipe_recvCount
Definition: UTPipeline.F90:42
integer, dimension(:), allocatable, save utpipe_recvIndex
Definition: UTPipeline.F90:41
integer, dimension(:), allocatable, save utpipe_sendRequest
Definition: UTPipeline.F90:45
subroutine UTPipeline_isEmpty(isEmpty)
Definition: UTPipeline.F90:478
subroutine UTPipeline_iterateItems(readOnlyFn)
Definition: UTPipeline.F90:602
subroutine utpipe_handleOldRecvMsg()
Definition: UTPipeline.F90:277
integer, dimension(:), allocatable, save utpipe_procList
Definition: UTPipeline.F90:49
integer, save utpipe_itemCount
Definition: UTPipeline.F90:50
integer, parameter FLASH_INTEGER
Definition: UTPipeline.F90:28
subroutine UTPipeline_getItems(userArray, userMaxCount, userCount)
Definition: UTPipeline.F90:493
real, dimension(:,:,:), allocatable, save utpipe_recvBuf
Definition: UTPipeline.F90:35
subroutine utpipe_postSendMsg(index)
Definition: UTPipeline.F90:320
integer, save utpipe_size
Definition: UTPipeline.F90:53
subroutine UTPipeline_progressComm(doFlush)
Definition: UTPipeline.F90:358
subroutine UTPipeline_isCommDone(isCommDone)
Definition: UTPipeline.F90:445
subroutine UTPipeline_finalizeComm(doAsyncReturn)
Definition: UTPipeline.F90:157
subroutine UTPipeline_initComm()
Definition: UTPipeline.F90:130
integer, parameter WAITING_TO_CLOSE_STATE
Definition: UTPipeline.F90:74
integer, save utpipe_rank
Definition: UTPipeline.F90:54
subroutine UTPipeline_isDone(isDone)
Definition: UTPipeline.F90:460
subroutine UTPipeline_sendItem(item, procID, isHandled)
Definition: UTPipeline.F90:556
logical, save utpipe_doLog
Definition: UTPipeline.F90:69
integer, parameter PROMISE_TO_CLOSE_STATE
Definition: UTPipeline.F90:73
subroutine utpipe_progressClosePromise()
Definition: UTPipeline.F90:430
subroutine UTPipeline_finalize
Definition: UTPipeline.F90:205
subroutine UTPipeline_sendFullestChannel
Definition: UTPipeline.F90:291
integer, dimension(:), allocatable, save utpipe_sendIndex
Definition: UTPipeline.F90:46
integer, parameter utpipe_tag
Definition: UTPipeline.F90:38
integer, dimension(:), allocatable, save utpipe_sendCount
Definition: UTPipeline.F90:47
integer, save utpipe_itemSize
Definition: UTPipeline.F90:56
logical, save utpipe_isSendCommDone
Definition: UTPipeline.F90:61
subroutine UTPipeline_progressSendComm()
Definition: UTPipeline.F90:375
integer, dimension(:), allocatable, save utpipe_recvRequest
Definition: UTPipeline.F90:40
real, dimension(:,:), allocatable, save utpipe_itemBuf
Definition: UTPipeline.F90:36
integer, dimension(:,:), allocatable, save utpipe_sendStatus
Definition: UTPipeline.F90:44
integer, dimension(:,:), allocatable, save utpipe_recvStatus
Definition: UTPipeline.F90:39
integer, save utpipe_maxItems
Definition: UTPipeline.F90:57