2.1.2-beta (revision 4707)
OTF2_MPI_Collectives.h
Go to the documentation of this file.
1 /*
2  * This file is part of the Score-P software (http://www.score-p.org)
3  *
4  * Copyright (c) 2013-2014,
5  * Technische Universitaet Dresden, Germany
6  *
7  * This software may be modified and distributed under the terms of
8  * a BSD-style license. See the COPYING file in the package base
9  * directory for details.
10  *
11  */
12 
13 
383 #ifndef OTF2_MPI_COLLECTIVES_H
384 #define OTF2_MPI_COLLECTIVES_H
385 
386 
387 #include <otf2/otf2.h>
388 
389 
390 #include <mpi.h>
391 
392 
403 static OTF2_ErrorCode
405  MPI_Comm globalComm,
406  MPI_Comm localComm );
407 
408 
419 static OTF2_ErrorCode
421  MPI_Comm globalComm,
422  uint32_t numberOfFiles );
423 
424 
432 static OTF2_ErrorCode
434  MPI_Comm globalComm );
435 
436 
447 #ifdef OTF2_MPI_USE_PMPI
448 # define CALL_MPI( name ) P ## name
449 #else
450 # define CALL_MPI( name ) name
451 # define OTF2_MPI_USE_PMPI
452 # define OTF2_MPI_USE_PMPI_undef_me
453 #endif
454 
455 
463 #ifndef OTF2_MPI_UINT8_T
464 # if MPI_VERSION >= 3
465 # define OTF2_MPI_UINT8_T MPI_UINT8_T
466 # else
467 # define OTF2_MPI_UINT8_T MPI_UNSIGNED_CHAR
468 # endif
469 #endif
470 
478 #ifndef OTF2_MPI_INT8_T
479 # if MPI_VERSION >= 3
480 # define OTF2_MPI_INT8_T MPI_INT8_T
481 # else
482 # define OTF2_MPI_INT8_T MPI_CHAR
483 # endif
484 #endif
485 
486 
494 #ifndef OTF2_MPI_UINT16_T
495 # if MPI_VERSION >= 3
496 # define OTF2_MPI_UINT16_T MPI_UINT16_T
497 # else
498 # define OTF2_MPI_UINT16_T MPI_UNSIGNED_SHORT
499 # endif
500 #endif
501 
509 #ifndef OTF2_MPI_INT16_T
510 # if MPI_VERSION >= 3
511 # define OTF2_MPI_INT16_T MPI_INT16_T
512 # else
513 # define OTF2_MPI_INT16_T MPI_SHORT
514 # endif
515 #endif
516 
517 
525 #ifndef OTF2_MPI_UINT32_T
526 # if MPI_VERSION >= 3
527 # define OTF2_MPI_UINT32_T MPI_UINT32_T
528 # else
529 # define OTF2_MPI_UINT32_T MPI_UNSIGNED
530 # endif
531 #endif
532 
540 #ifndef OTF2_MPI_INT32_T
541 # if MPI_VERSION >= 3
542 # define OTF2_MPI_INT32_T MPI_INT32_T
543 # else
544 # define OTF2_MPI_INT32_T MPI_INT
545 # endif
546 #endif
547 
548 
556 #ifndef OTF2_MPI_UINT64_T
557 # define OTF2_MPI_UINT64_T MPI_UINT64_T
558 # if MPI_VERSION < 3
559 # error Please define OTF2_MPI_UINT64_T to a suitable MPI datatype for uint64_t.
560 # endif
561 #endif
562 
570 #ifndef OTF2_MPI_INT64_T
571 # define OTF2_MPI_INT64_T MPI_INT64_T
572 # if MPI_VERSION < 3
573 # error Please define OTF2_MPI_INT64 to a suitable MPI datatype for int64_t.
574 # endif
575 #endif
576 
577 
584 #ifndef OTF2_MPI_FLOAT
585 # define OTF2_MPI_FLOAT MPI_FLOAT
586 #endif
587 
588 
595 #ifndef OTF2_MPI_DOUBLE
596 # define OTF2_MPI_DOUBLE MPI_DOUBLE
597 #endif
598 
599 
608 {
609  MPI_Comm comm;
610  int size;
611  int rank;
612  int displacements[ 1 ];
613 };
614 
615 
618 typedef struct OTF2_MPI_UserData
619 {
620  OTF2_CollectiveCallbacks callbacks;
621  OTF2_CollectiveContext* global;
622  OTF2_CollectiveContext* local;
623 } OTF2_MPI_UserData;
624 
625 
626 static void
627 otf2_mpi_get_collectives( OTF2_CollectiveCallbacks* collectiveCallbacks );
628 
629 
631 otf2_mpi_create_context( MPI_Comm comm,
632  bool duplicate );
633 
634 
635 static void
636 otf2_mpi_destroy_context( OTF2_CollectiveContext* collectiveContext );
637 
638 
640 otf2_mpi_split_context_by_number( OTF2_CollectiveContext* commContext,
641  uint32_t numberOfFiles );
642 
643 
644 static OTF2_ErrorCode
646  MPI_Comm globalComm,
647  MPI_Comm localComm )
648 {
649  OTF2_ErrorCode status = OTF2_SUCCESS;
650  OTF2_MPI_UserData* user_data = NULL;
651 
654 
655  if ( !archive )
656  {
658  }
659 
660  if ( MPI_COMM_NULL == globalComm )
661  {
663  }
664 
665  user_data = ( OTF2_MPI_UserData* )calloc( 1, sizeof( *user_data ) );
666  if ( !user_data )
667  {
669  }
670 
671  otf2_mpi_get_collectives( &user_data->callbacks );
672 
673  user_data->global = otf2_mpi_create_context( globalComm, true );
674  if ( !user_data->global )
675  {
677  goto out;
678  }
679 
680  if ( MPI_COMM_NULL != localComm )
681  {
682  user_data->local = otf2_mpi_create_context( localComm, true );
683  if ( !user_data->local )
684  {
686  goto out;
687  }
688  }
689 
690  status = OTF2_Archive_SetCollectiveCallbacks( archive,
691  &user_data->callbacks,
692  user_data,
693  user_data->global,
694  user_data->local );
695 
696 out:
697  if ( OTF2_SUCCESS != status )
698  {
699  otf2_mpi_destroy_context( user_data->local );
700  otf2_mpi_destroy_context( user_data->global );
701  free( user_data );
702  }
703 
704  return status;
705 }
706 
707 
708 static OTF2_ErrorCode
710  MPI_Comm globalComm,
711  uint32_t numberOfFiles )
712 {
713  OTF2_ErrorCode status = OTF2_SUCCESS;
714  OTF2_MPI_UserData* user_data = NULL;
715 
718 
719  if ( !archive )
720  {
722  }
723 
724  if ( MPI_COMM_NULL == globalComm )
725  {
727  }
728 
729  user_data = ( OTF2_MPI_UserData* )calloc( 1, sizeof( *user_data ) );
730  if ( !user_data )
731  {
733  }
734 
735  otf2_mpi_get_collectives( &user_data->callbacks );
736 
737  user_data->global = otf2_mpi_create_context( globalComm, true );
738  if ( !user_data->global )
739  {
741  goto out;
742  }
743 
744  user_data->local = otf2_mpi_split_context_by_number( user_data->global,
745  numberOfFiles );
746  if ( !user_data->local )
747  {
749  goto out;
750  }
751 
752  status = OTF2_Archive_SetCollectiveCallbacks( archive,
753  &user_data->callbacks,
754  user_data,
755  user_data->global,
756  user_data->local );
757 
758 out:
759  if ( OTF2_SUCCESS != status )
760  {
761  otf2_mpi_destroy_context( user_data->local );
762  otf2_mpi_destroy_context( user_data->global );
763  free( user_data );
764  }
765 
766  return status;
767 }
768 
769 
770 static OTF2_ErrorCode
772  MPI_Comm globalComm )
773 {
774  OTF2_ErrorCode status = OTF2_SUCCESS;
775  OTF2_MPI_UserData* user_data = NULL;
776 
779 
780  if ( !reader )
781  {
783  }
784 
785  if ( MPI_COMM_NULL == globalComm )
786  {
788  }
789 
790  user_data = ( OTF2_MPI_UserData* )calloc( 1, sizeof( *user_data ) );
791  if ( !user_data )
792  {
794  }
795 
796  otf2_mpi_get_collectives( &user_data->callbacks );
797 
798  user_data->global = otf2_mpi_create_context( globalComm, true );
799  if ( !user_data->global )
800  {
802  goto out;
803  }
804 
805  status = OTF2_Reader_SetCollectiveCallbacks( reader,
806  &user_data->callbacks,
807  user_data,
808  user_data->global,
809  NULL );
810 
811 out:
812  if ( OTF2_SUCCESS != status )
813  {
814  otf2_mpi_destroy_context( user_data->global );
815  free( user_data );
816  }
817 
818  return status;
819 }
820 
821 
823 otf2_mpi_create_context( MPI_Comm comm,
824  bool duplicate )
825 {
826  int ret;
827  int size;
828 
829  ret = CALL_MPI( MPI_Comm_size ) ( comm, &size );
830  if ( MPI_SUCCESS != ret )
831  {
832  return NULL;
833  }
834 
835  OTF2_CollectiveContext* new_context =
836  ( OTF2_CollectiveContext* )malloc( sizeof( *new_context )
837  + ( ( size - 1 ) * sizeof( int ) ) );
838  if ( !new_context )
839  {
840  return NULL;
841  }
842 
843  new_context->size = size;
844  ret = CALL_MPI( MPI_Comm_rank ) ( comm, &new_context->rank );
845  if ( MPI_SUCCESS != ret )
846  {
847  free( new_context );
848  return NULL;
849  }
850 
851  if ( duplicate )
852  {
853  ret = CALL_MPI( MPI_Comm_dup ) ( comm, &new_context->comm );
854  if ( MPI_SUCCESS != ret )
855  {
856  free( new_context );
857  return NULL;
858  }
859  }
860  else
861  {
862  new_context->comm = comm;
863  }
864 
865  return new_context;
866 }
867 
868 
869 static void
870 otf2_mpi_destroy_context( OTF2_CollectiveContext* collectiveContext )
871 {
872  if ( !collectiveContext )
873  {
874  return;
875  }
876 
877  CALL_MPI( MPI_Comm_free ) ( &collectiveContext->comm );
878 
879  free( collectiveContext );
880 }
881 
882 
884 otf2_mpi_split_context( OTF2_CollectiveContext* commContext,
885  int color,
886  int key )
887 {
888  OTF2_CollectiveContext* new_context;
889  MPI_Comm new_comm;
890  int ret;
891  ret = CALL_MPI( MPI_Comm_split ) ( commContext->comm,
892  color,
893  key,
894  &new_comm );
895  if ( MPI_SUCCESS != ret )
896  {
897  return NULL;
898  }
899 
900  new_context = otf2_mpi_create_context( new_comm, false );
901  if ( !new_context )
902  {
903  CALL_MPI( MPI_Comm_free ) ( &new_comm );
904  return NULL;
905  }
906 
907  return new_context;
908 }
909 
910 
912 otf2_mpi_split_context_by_number( OTF2_CollectiveContext* commContext,
913  uint32_t numberOfFiles )
914 {
915  int file_number = 0;
916  int rem = commContext->size % numberOfFiles;
917  int local_size = commContext->size / numberOfFiles + !!rem;
918  int local_rank = 0;
919  int local_root = 0;
920  int i;
921  for ( i = 0; i < commContext->rank; i++ )
922  {
923  local_rank++;
924  if ( local_root + local_size == i + 1 )
925  {
926  local_root += local_size;
927  file_number++;
928  local_size -= file_number == rem;
929  local_rank = 0;
930  }
931  }
932 
933  return otf2_mpi_split_context( commContext,
934  file_number,
935  local_rank );
936 }
937 
938 
939 static MPI_Datatype
940 otf2_mpi_get_type( OTF2_Type type )
941 {
942 #define case_return( TYPE, MPI_SUFFIX ) \
943  case OTF2_TYPE_ ## TYPE: \
944  return OTF2_MPI_ ## TYPE ## MPI_SUFFIX
945  switch ( type )
946  {
947  case_return( UINT8, _T );
948  case_return( INT8, _T );
949  case_return( UINT16, _T );
950  case_return( INT16, _T );
951  case_return( UINT32, _T );
952  case_return( INT32, _T );
953  case_return( UINT64, _T );
954  case_return( INT64, _T );
955  case_return( FLOAT, );
956  case_return( DOUBLE, );
957  default:
958  return MPI_DATATYPE_NULL;
959  }
960 #undef case_return
961 }
962 
963 
964 static void
965 otf2_mpi_collectives_release( void* userData,
966  OTF2_CollectiveContext* globalCommContext,
967  OTF2_CollectiveContext* localCommContext )
968 {
969  OTF2_MPI_UserData* user_data = ( OTF2_MPI_UserData* )userData;
970 
971  ( void )globalCommContext;
972  ( void )localCommContext;
973 
974  otf2_mpi_destroy_context( user_data->global );
975  otf2_mpi_destroy_context( user_data->local );
976  free( user_data );
977 }
978 
979 
980 static OTF2_CallbackCode
981 otf2_mpi_collectives_create_local_comm( void* userData,
982  OTF2_CollectiveContext** localCommContextOut,
983  OTF2_CollectiveContext* globalCommContext,
984  uint32_t globalRank,
985  uint32_t globalSize,
986  uint32_t localRank,
987  uint32_t localSize,
988  uint32_t fileNumber,
989  uint32_t numberOfFiles )
990 {
991  ( void )userData;
992  ( void )globalRank;
993  ( void )globalSize;
994  ( void )localSize;
995  ( void )numberOfFiles;
996 
997  *localCommContextOut = otf2_mpi_split_context( globalCommContext,
998  fileNumber,
999  localRank );
1000 
1001  return *localCommContextOut
1004 }
1005 
1006 
1007 static OTF2_CallbackCode
1008 otf2_mpi_collectives_free_local_comm( void* userData,
1009  OTF2_CollectiveContext* localCommContext )
1010 {
1011  ( void )userData;
1012 
1013  otf2_mpi_destroy_context( localCommContext );
1014 
1015  return OTF2_CALLBACK_SUCCESS;
1016 }
1017 
1018 
1019 static OTF2_CallbackCode
1020 otf2_mpi_collectives_get_size( void* userData,
1021  OTF2_CollectiveContext* commContext,
1022  uint32_t* size )
1023 {
1024  ( void )userData;
1025 
1026  *size = commContext->size;
1027 
1028  return OTF2_CALLBACK_SUCCESS;
1029 }
1030 
1031 
1032 static OTF2_CallbackCode
1033 otf2_mpi_collectives_get_rank( void* userData,
1034  OTF2_CollectiveContext* commContext,
1035  uint32_t* rank )
1036 {
1037  ( void )userData;
1038 
1039  *rank = commContext->rank;
1040 
1041  return OTF2_CALLBACK_SUCCESS;
1042 }
1043 
1044 
1045 static OTF2_CallbackCode
1046 otf2_mpi_collectives_barrier( void* userData,
1047  OTF2_CollectiveContext* commContext )
1048 {
1049  int ret;
1050 
1051  ( void )userData;
1052 
1053  ret = CALL_MPI( MPI_Barrier ) ( commContext->comm );
1054 
1055  return MPI_SUCCESS == ret
1058 }
1059 
1060 
1061 static OTF2_CallbackCode
1062 otf2_mpi_collectives_bcast( void* userData,
1063  OTF2_CollectiveContext* commContext,
1064  void* data,
1065  uint32_t numberElements,
1066  OTF2_Type type,
1067  uint32_t root )
1068 {
1069  int ret;
1070 
1071  ( void )userData;
1072 
1073  ret = CALL_MPI( MPI_Bcast ) ( data,
1074  numberElements,
1075  otf2_mpi_get_type( type ),
1076  root,
1077  commContext->comm );
1078 
1079  return MPI_SUCCESS == ret
1082 }
1083 
1084 
1085 static OTF2_CallbackCode
1086 otf2_mpi_collectives_gather( void* userData,
1087  OTF2_CollectiveContext* commContext,
1088  const void* inData,
1089  void* outData,
1090  uint32_t numberElements,
1091  OTF2_Type type,
1092  uint32_t root )
1093 {
1094  int ret;
1095 
1096  ( void )userData;
1097 
1098  ret = CALL_MPI( MPI_Gather ) ( ( void* )inData,
1099  numberElements,
1100  otf2_mpi_get_type( type ),
1101  outData,
1102  numberElements,
1103  otf2_mpi_get_type( type ),
1104  root,
1105  commContext->comm );
1106 
1107  return MPI_SUCCESS == ret
1110 }
1111 
1112 
1113 static OTF2_CallbackCode
1114 otf2_mpi_collectives_gatherv( void* userData,
1115  OTF2_CollectiveContext* commContext,
1116  const void* inData,
1117  uint32_t inElements,
1118  void* outData,
1119  const uint32_t* outElements,
1120  OTF2_Type type,
1121  uint32_t root )
1122 {
1123  int ret;
1124  int* displs = NULL;
1125 
1126  ( void )userData;
1127 
1128  if ( ( int )root == commContext->rank )
1129  {
1130  int i;
1131  int displ = 0;
1132  for ( i = 0; i < commContext->rank; ++i )
1133  {
1134  commContext->displacements[ i ] = displ;
1135  displ += outElements[ i ];
1136  }
1137  displs = commContext->displacements;
1138  }
1139 
1140  ret = CALL_MPI( MPI_Gatherv ) ( ( void* )inData,
1141  inElements,
1142  otf2_mpi_get_type( type ),
1143  outData,
1144  ( int* )outElements,
1145  displs,
1146  otf2_mpi_get_type( type ),
1147  root,
1148  commContext->comm );
1149 
1150  return MPI_SUCCESS == ret
1153 }
1154 
1155 
1156 static OTF2_CallbackCode
1157 otf2_mpi_collectives_scatter( void* userData,
1158  OTF2_CollectiveContext* commContext,
1159  const void* inData,
1160  void* outData,
1161  uint32_t numberElements,
1162  OTF2_Type type,
1163  uint32_t root )
1164 {
1165  ( void )userData;
1166 
1167  int ret = CALL_MPI( MPI_Scatter ) ( ( void* )inData,
1168  numberElements,
1169  otf2_mpi_get_type( type ),
1170  outData,
1171  numberElements,
1172  otf2_mpi_get_type( type ),
1173  root,
1174  commContext->comm );
1175 
1176  return MPI_SUCCESS == ret
1179 }
1180 
1181 
1182 static OTF2_CallbackCode
1183 otf2_mpi_collectives_scatterv( void* userData,
1184  OTF2_CollectiveContext* commContext,
1185  const void* inData,
1186  const uint32_t* inElements,
1187  void* outData,
1188  uint32_t outElements,
1189  OTF2_Type type,
1190  uint32_t root )
1191 {
1192  int* displs = NULL;
1193 
1194  ( void )userData;
1195 
1196  if ( ( int )root == commContext->rank )
1197  {
1198  int i;
1199  int displ = 0;
1200  for ( i = 0; i < commContext->rank; ++i )
1201  {
1202  commContext->displacements[ i ] = displ;
1203  displ += inElements[ i ];
1204  }
1205  displs = commContext->displacements;
1206  }
1207 
1208  int ret = CALL_MPI( MPI_Scatterv ) ( ( void* )inData,
1209  ( int* )inElements,
1210  displs,
1211  otf2_mpi_get_type( type ),
1212  outData,
1213  outElements,
1214  otf2_mpi_get_type( type ),
1215  root,
1216  commContext->comm );
1217 
1218  return MPI_SUCCESS == ret
1221 }
1222 
1223 
1224 static void
1225 otf2_mpi_get_collectives( OTF2_CollectiveCallbacks* collectiveCallbacks )
1226 {
1227  collectiveCallbacks->otf2_release = otf2_mpi_collectives_release;
1228  collectiveCallbacks->otf2_get_size = otf2_mpi_collectives_get_size;
1229  collectiveCallbacks->otf2_get_rank = otf2_mpi_collectives_get_rank;
1230  collectiveCallbacks->otf2_create_local_comm = otf2_mpi_collectives_create_local_comm;
1231  collectiveCallbacks->otf2_free_local_comm = otf2_mpi_collectives_free_local_comm;
1232  collectiveCallbacks->otf2_barrier = otf2_mpi_collectives_barrier;
1233  collectiveCallbacks->otf2_bcast = otf2_mpi_collectives_bcast;
1234  collectiveCallbacks->otf2_gather = otf2_mpi_collectives_gather;
1235  collectiveCallbacks->otf2_gatherv = otf2_mpi_collectives_gatherv;
1236  collectiveCallbacks->otf2_scatter = otf2_mpi_collectives_scatter;
1237  collectiveCallbacks->otf2_scatterv = otf2_mpi_collectives_scatterv;
1238 }
1239 
1240 
1241 #undef CALL_MPI
1242 #ifdef OTF2_MPI_USE_PMPI_undef_me
1243 #undef OTF2_MPI_USE_PMPI
1244 #undef OTF2_MPI_USE_PMPI_undef_me
1245 #endif
1246 
1247 
1253 #endif /* OTF2_MPI_COLLECTIVES_H */
Record reading can continue.
Definition: OTF2_GeneralDefinitions.h:347
Main include file for applications using OTF2.
uint8_t OTF2_Type
Wrapper for enum OTF2_Type_enum.
Definition: OTF2_GeneralDefinitions.h:561
Definition: OTF2_ErrorCodes.h:66
Definition: OTF2_ErrorCodes.h:231
Definition: OTF2_ErrorCodes.h:245
Struct which holds all collective callbacks.
Definition: OTF2_Callbacks.h:490
OTF2_ErrorCode OTF2_Reader_SetCollectiveCallbacks(OTF2_Reader *reader, const OTF2_CollectiveCallbacks *collectiveCallbacks, void *collectiveData, OTF2_CollectiveContext *globalCommContext, OTF2_CollectiveContext *localCommContext)
Set the collective callbacks for the reader.
OTF2_ErrorCode
Definition: OTF2_ErrorCodes.h:54
OTF2_CallbackCode
Return value to indicate that the record reading should be interrupted.
Definition: OTF2_GeneralDefinitions.h:344
Signaling an error in the callback.
Definition: OTF2_GeneralDefinitions.h:358
struct OTF2_Archive_struct OTF2_Archive
Keeps all meta-data for an OTF2 archive.
Definition: OTF2_Archive.h:205
struct OTF2_Reader_struct OTF2_Reader
Keeps all necessary information for the reader.
Definition: OTF2_Reader.h:190
static OTF2_ErrorCode OTF2_MPI_Archive_SetCollectiveCallbacks(OTF2_Archive *archive, MPI_Comm globalComm, MPI_Comm localComm)
Register a MPI collective context to an OTF2 archive.
static OTF2_ErrorCode OTF2_MPI_Reader_SetCollectiveCallbacks(OTF2_Reader *reader, MPI_Comm globalComm)
Register a MPI collective context to an OTF2 reader.
static OTF2_ErrorCode OTF2_MPI_Archive_SetCollectiveCallbacksSplit(OTF2_Archive *archive, MPI_Comm globalComm, uint32_t numberOfFiles)
Register a MPI collective context to an OTF2 archive.
struct OTF2_CollectiveContext OTF2_CollectiveContext
User provided type for collective groups.
Definition: OTF2_Callbacks.h:303
OTF2_ErrorCode OTF2_Archive_SetCollectiveCallbacks(OTF2_Archive *archive, const OTF2_CollectiveCallbacks *collectiveCallbacks, void *collectiveData, OTF2_CollectiveContext *globalCommContext, OTF2_CollectiveContext *localCommContext)
Set the collective callbacks for the archive.
Definition: OTF2_ErrorCodes.h:247