2.2 (revision 4737)
OTF2_MPI_Collectives.h
Go to the documentation of this file.
1 /*
2  * This file is part of the Score-P software (http://www.score-p.org)
3  *
4  * Copyright (c) 2013-2014,
5  * Technische Universitaet Dresden, Germany
6  *
7  * This software may be modified and distributed under the terms of
8  * a BSD-style license. See the COPYING file in the package base
9  * directory for details.
10  *
11  */
12 
13 
389 #ifndef OTF2_MPI_COLLECTIVES_H
390 #define OTF2_MPI_COLLECTIVES_H
391 
392 
393 #include <otf2/otf2.h>
394 
395 
396 #include <mpi.h>
397 
398 
409 static OTF2_ErrorCode
411  MPI_Comm globalComm,
412  MPI_Comm localComm );
413 
414 
425 static OTF2_ErrorCode
427  MPI_Comm globalComm,
428  uint32_t numberOfFiles );
429 
430 
438 static OTF2_ErrorCode
440  MPI_Comm globalComm );
441 
442 
453 #ifdef OTF2_MPI_USE_PMPI
454 # define CALL_MPI( name ) P ## name
455 #else
456 # define CALL_MPI( name ) name
457 # define OTF2_MPI_USE_PMPI
458 # define OTF2_MPI_USE_PMPI_undef_me
459 #endif
460 
461 
469 #ifndef OTF2_MPI_UINT8_T
470 # if MPI_VERSION >= 3
471 # define OTF2_MPI_UINT8_T MPI_UINT8_T
472 # else
473 # define OTF2_MPI_UINT8_T MPI_UNSIGNED_CHAR
474 # endif
475 #endif
476 
484 #ifndef OTF2_MPI_INT8_T
485 # if MPI_VERSION >= 3
486 # define OTF2_MPI_INT8_T MPI_INT8_T
487 # else
488 # define OTF2_MPI_INT8_T MPI_CHAR
489 # endif
490 #endif
491 
492 
500 #ifndef OTF2_MPI_UINT16_T
501 # if MPI_VERSION >= 3
502 # define OTF2_MPI_UINT16_T MPI_UINT16_T
503 # else
504 # define OTF2_MPI_UINT16_T MPI_UNSIGNED_SHORT
505 # endif
506 #endif
507 
515 #ifndef OTF2_MPI_INT16_T
516 # if MPI_VERSION >= 3
517 # define OTF2_MPI_INT16_T MPI_INT16_T
518 # else
519 # define OTF2_MPI_INT16_T MPI_SHORT
520 # endif
521 #endif
522 
523 
531 #ifndef OTF2_MPI_UINT32_T
532 # if MPI_VERSION >= 3
533 # define OTF2_MPI_UINT32_T MPI_UINT32_T
534 # else
535 # define OTF2_MPI_UINT32_T MPI_UNSIGNED
536 # endif
537 #endif
538 
546 #ifndef OTF2_MPI_INT32_T
547 # if MPI_VERSION >= 3
548 # define OTF2_MPI_INT32_T MPI_INT32_T
549 # else
550 # define OTF2_MPI_INT32_T MPI_INT
551 # endif
552 #endif
553 
554 
562 #ifndef OTF2_MPI_UINT64_T
563 # define OTF2_MPI_UINT64_T MPI_UINT64_T
564 # if MPI_VERSION < 3
565 # error Please define OTF2_MPI_UINT64_T to a suitable MPI datatype for uint64_t.
566 # endif
567 #endif
568 
576 #ifndef OTF2_MPI_INT64_T
577 # define OTF2_MPI_INT64_T MPI_INT64_T
578 # if MPI_VERSION < 3
579 # error Please define OTF2_MPI_INT64 to a suitable MPI datatype for int64_t.
580 # endif
581 #endif
582 
583 
590 #ifndef OTF2_MPI_FLOAT
591 # define OTF2_MPI_FLOAT MPI_FLOAT
592 #endif
593 
594 
601 #ifndef OTF2_MPI_DOUBLE
602 # define OTF2_MPI_DOUBLE MPI_DOUBLE
603 #endif
604 
605 
614 {
615  MPI_Comm comm;
616  int size;
617  int rank;
618  int displacements[ 1 ];
619 };
620 
621 
624 typedef struct OTF2_MPI_UserData
625 {
626  OTF2_CollectiveCallbacks callbacks;
627  OTF2_CollectiveContext* global;
628  OTF2_CollectiveContext* local;
629 } OTF2_MPI_UserData;
630 
631 
632 static void
633 otf2_mpi_get_collectives( OTF2_CollectiveCallbacks* collectiveCallbacks );
634 
635 
637 otf2_mpi_create_context( MPI_Comm comm,
638  bool duplicate );
639 
640 
641 static void
642 otf2_mpi_destroy_context( OTF2_CollectiveContext* collectiveContext );
643 
644 
646 otf2_mpi_split_context_by_number( OTF2_CollectiveContext* commContext,
647  uint32_t numberOfFiles );
648 
649 
650 static OTF2_ErrorCode
652  MPI_Comm globalComm,
653  MPI_Comm localComm )
654 {
655  OTF2_ErrorCode status = OTF2_SUCCESS;
656  OTF2_MPI_UserData* user_data = NULL;
657 
660 
661  if ( !archive )
662  {
664  }
665 
666  if ( MPI_COMM_NULL == globalComm )
667  {
669  }
670 
671  user_data = ( OTF2_MPI_UserData* )calloc( 1, sizeof( *user_data ) );
672  if ( !user_data )
673  {
675  }
676 
677  otf2_mpi_get_collectives( &user_data->callbacks );
678 
679  user_data->global = otf2_mpi_create_context( globalComm, true );
680  if ( !user_data->global )
681  {
683  goto out;
684  }
685 
686  if ( MPI_COMM_NULL != localComm )
687  {
688  user_data->local = otf2_mpi_create_context( localComm, true );
689  if ( !user_data->local )
690  {
692  goto out;
693  }
694  }
695 
696  status = OTF2_Archive_SetCollectiveCallbacks( archive,
697  &user_data->callbacks,
698  user_data,
699  user_data->global,
700  user_data->local );
701 
702 out:
703  if ( OTF2_SUCCESS != status )
704  {
705  otf2_mpi_destroy_context( user_data->local );
706  otf2_mpi_destroy_context( user_data->global );
707  free( user_data );
708  }
709 
710  return status;
711 }
712 
713 
714 static OTF2_ErrorCode
716  MPI_Comm globalComm,
717  uint32_t numberOfFiles )
718 {
719  OTF2_ErrorCode status = OTF2_SUCCESS;
720  OTF2_MPI_UserData* user_data = NULL;
721 
724 
725  if ( !archive )
726  {
728  }
729 
730  if ( MPI_COMM_NULL == globalComm )
731  {
733  }
734 
735  user_data = ( OTF2_MPI_UserData* )calloc( 1, sizeof( *user_data ) );
736  if ( !user_data )
737  {
739  }
740 
741  otf2_mpi_get_collectives( &user_data->callbacks );
742 
743  user_data->global = otf2_mpi_create_context( globalComm, true );
744  if ( !user_data->global )
745  {
747  goto out;
748  }
749 
750  user_data->local = otf2_mpi_split_context_by_number( user_data->global,
751  numberOfFiles );
752  if ( !user_data->local )
753  {
755  goto out;
756  }
757 
758  status = OTF2_Archive_SetCollectiveCallbacks( archive,
759  &user_data->callbacks,
760  user_data,
761  user_data->global,
762  user_data->local );
763 
764 out:
765  if ( OTF2_SUCCESS != status )
766  {
767  otf2_mpi_destroy_context( user_data->local );
768  otf2_mpi_destroy_context( user_data->global );
769  free( user_data );
770  }
771 
772  return status;
773 }
774 
775 
776 static OTF2_ErrorCode
778  MPI_Comm globalComm )
779 {
780  OTF2_ErrorCode status = OTF2_SUCCESS;
781  OTF2_MPI_UserData* user_data = NULL;
782 
785 
786  if ( !reader )
787  {
789  }
790 
791  if ( MPI_COMM_NULL == globalComm )
792  {
794  }
795 
796  user_data = ( OTF2_MPI_UserData* )calloc( 1, sizeof( *user_data ) );
797  if ( !user_data )
798  {
800  }
801 
802  otf2_mpi_get_collectives( &user_data->callbacks );
803 
804  user_data->global = otf2_mpi_create_context( globalComm, true );
805  if ( !user_data->global )
806  {
808  goto out;
809  }
810 
811  status = OTF2_Reader_SetCollectiveCallbacks( reader,
812  &user_data->callbacks,
813  user_data,
814  user_data->global,
815  NULL );
816 
817 out:
818  if ( OTF2_SUCCESS != status )
819  {
820  otf2_mpi_destroy_context( user_data->global );
821  free( user_data );
822  }
823 
824  return status;
825 }
826 
827 
829 otf2_mpi_create_context( MPI_Comm comm,
830  bool duplicate )
831 {
832  int ret;
833  int size;
834 
835  ret = CALL_MPI( MPI_Comm_size ) ( comm, &size );
836  if ( MPI_SUCCESS != ret )
837  {
838  return NULL;
839  }
840 
841  OTF2_CollectiveContext* new_context =
842  ( OTF2_CollectiveContext* )malloc( sizeof( *new_context )
843  + ( ( size - 1 ) * sizeof( int ) ) );
844  if ( !new_context )
845  {
846  return NULL;
847  }
848 
849  new_context->size = size;
850  ret = CALL_MPI( MPI_Comm_rank ) ( comm, &new_context->rank );
851  if ( MPI_SUCCESS != ret )
852  {
853  free( new_context );
854  return NULL;
855  }
856 
857  if ( duplicate )
858  {
859  ret = CALL_MPI( MPI_Comm_dup ) ( comm, &new_context->comm );
860  if ( MPI_SUCCESS != ret )
861  {
862  free( new_context );
863  return NULL;
864  }
865  }
866  else
867  {
868  new_context->comm = comm;
869  }
870 
871  return new_context;
872 }
873 
874 
875 static void
876 otf2_mpi_destroy_context( OTF2_CollectiveContext* collectiveContext )
877 {
878  if ( !collectiveContext )
879  {
880  return;
881  }
882 
883  CALL_MPI( MPI_Comm_free ) ( &collectiveContext->comm );
884 
885  free( collectiveContext );
886 }
887 
888 
890 otf2_mpi_split_context( OTF2_CollectiveContext* commContext,
891  int color,
892  int key )
893 {
894  OTF2_CollectiveContext* new_context;
895  MPI_Comm new_comm;
896  int ret;
897  ret = CALL_MPI( MPI_Comm_split ) ( commContext->comm,
898  color,
899  key,
900  &new_comm );
901  if ( MPI_SUCCESS != ret )
902  {
903  return NULL;
904  }
905 
906  new_context = otf2_mpi_create_context( new_comm, false );
907  if ( !new_context )
908  {
909  CALL_MPI( MPI_Comm_free ) ( &new_comm );
910  return NULL;
911  }
912 
913  return new_context;
914 }
915 
916 
918 otf2_mpi_split_context_by_number( OTF2_CollectiveContext* commContext,
919  uint32_t numberOfFiles )
920 {
921  int file_number = 0;
922  int rem = commContext->size % numberOfFiles;
923  int local_size = commContext->size / numberOfFiles + !!rem;
924  int local_rank = 0;
925  int local_root = 0;
926  int i;
927  for ( i = 0; i < commContext->rank; i++ )
928  {
929  local_rank++;
930  if ( local_root + local_size == i + 1 )
931  {
932  local_root += local_size;
933  file_number++;
934  local_size -= file_number == rem;
935  local_rank = 0;
936  }
937  }
938 
939  return otf2_mpi_split_context( commContext,
940  file_number,
941  local_rank );
942 }
943 
944 
945 static MPI_Datatype
946 otf2_mpi_get_type( OTF2_Type type )
947 {
948 #define case_return( TYPE, MPI_SUFFIX ) \
949  case OTF2_TYPE_ ## TYPE: \
950  return OTF2_MPI_ ## TYPE ## MPI_SUFFIX
951  switch ( type )
952  {
953  case_return( UINT8, _T );
954  case_return( INT8, _T );
955  case_return( UINT16, _T );
956  case_return( INT16, _T );
957  case_return( UINT32, _T );
958  case_return( INT32, _T );
959  case_return( UINT64, _T );
960  case_return( INT64, _T );
961  case_return( FLOAT, );
962  case_return( DOUBLE, );
963  default:
964  return MPI_DATATYPE_NULL;
965  }
966 #undef case_return
967 }
968 
969 
970 static void
971 otf2_mpi_collectives_release( void* userData,
972  OTF2_CollectiveContext* globalCommContext,
973  OTF2_CollectiveContext* localCommContext )
974 {
975  OTF2_MPI_UserData* user_data = ( OTF2_MPI_UserData* )userData;
976 
977  ( void )globalCommContext;
978  ( void )localCommContext;
979 
980  otf2_mpi_destroy_context( user_data->global );
981  otf2_mpi_destroy_context( user_data->local );
982  free( user_data );
983 }
984 
985 
986 static OTF2_CallbackCode
987 otf2_mpi_collectives_create_local_comm( void* userData,
988  OTF2_CollectiveContext** localCommContextOut,
989  OTF2_CollectiveContext* globalCommContext,
990  uint32_t globalRank,
991  uint32_t globalSize,
992  uint32_t localRank,
993  uint32_t localSize,
994  uint32_t fileNumber,
995  uint32_t numberOfFiles )
996 {
997  ( void )userData;
998  ( void )globalRank;
999  ( void )globalSize;
1000  ( void )localSize;
1001  ( void )numberOfFiles;
1002 
1003  *localCommContextOut = otf2_mpi_split_context( globalCommContext,
1004  fileNumber,
1005  localRank );
1006 
1007  return *localCommContextOut
1010 }
1011 
1012 
1013 static OTF2_CallbackCode
1014 otf2_mpi_collectives_free_local_comm( void* userData,
1015  OTF2_CollectiveContext* localCommContext )
1016 {
1017  ( void )userData;
1018 
1019  otf2_mpi_destroy_context( localCommContext );
1020 
1021  return OTF2_CALLBACK_SUCCESS;
1022 }
1023 
1024 
1025 static OTF2_CallbackCode
1026 otf2_mpi_collectives_get_size( void* userData,
1027  OTF2_CollectiveContext* commContext,
1028  uint32_t* size )
1029 {
1030  ( void )userData;
1031 
1032  *size = commContext->size;
1033 
1034  return OTF2_CALLBACK_SUCCESS;
1035 }
1036 
1037 
1038 static OTF2_CallbackCode
1039 otf2_mpi_collectives_get_rank( void* userData,
1040  OTF2_CollectiveContext* commContext,
1041  uint32_t* rank )
1042 {
1043  ( void )userData;
1044 
1045  *rank = commContext->rank;
1046 
1047  return OTF2_CALLBACK_SUCCESS;
1048 }
1049 
1050 
1051 static OTF2_CallbackCode
1052 otf2_mpi_collectives_barrier( void* userData,
1053  OTF2_CollectiveContext* commContext )
1054 {
1055  int ret;
1056 
1057  ( void )userData;
1058 
1059  ret = CALL_MPI( MPI_Barrier ) ( commContext->comm );
1060 
1061  return MPI_SUCCESS == ret
1064 }
1065 
1066 
1067 static OTF2_CallbackCode
1068 otf2_mpi_collectives_bcast( void* userData,
1069  OTF2_CollectiveContext* commContext,
1070  void* data,
1071  uint32_t numberElements,
1072  OTF2_Type type,
1073  uint32_t root )
1074 {
1075  int ret;
1076 
1077  ( void )userData;
1078 
1079  ret = CALL_MPI( MPI_Bcast ) ( data,
1080  numberElements,
1081  otf2_mpi_get_type( type ),
1082  root,
1083  commContext->comm );
1084 
1085  return MPI_SUCCESS == ret
1088 }
1089 
1090 
1091 static OTF2_CallbackCode
1092 otf2_mpi_collectives_gather( void* userData,
1093  OTF2_CollectiveContext* commContext,
1094  const void* inData,
1095  void* outData,
1096  uint32_t numberElements,
1097  OTF2_Type type,
1098  uint32_t root )
1099 {
1100  int ret;
1101 
1102  ( void )userData;
1103 
1104  ret = CALL_MPI( MPI_Gather ) ( ( void* )inData,
1105  numberElements,
1106  otf2_mpi_get_type( type ),
1107  outData,
1108  numberElements,
1109  otf2_mpi_get_type( type ),
1110  root,
1111  commContext->comm );
1112 
1113  return MPI_SUCCESS == ret
1116 }
1117 
1118 
1119 static OTF2_CallbackCode
1120 otf2_mpi_collectives_gatherv( void* userData,
1121  OTF2_CollectiveContext* commContext,
1122  const void* inData,
1123  uint32_t inElements,
1124  void* outData,
1125  const uint32_t* outElements,
1126  OTF2_Type type,
1127  uint32_t root )
1128 {
1129  int ret;
1130  int* displs = NULL;
1131 
1132  ( void )userData;
1133 
1134  if ( ( int )root == commContext->rank )
1135  {
1136  int i;
1137  int displ = 0;
1138  for ( i = 0; i < commContext->rank; ++i )
1139  {
1140  commContext->displacements[ i ] = displ;
1141  displ += outElements[ i ];
1142  }
1143  displs = commContext->displacements;
1144  }
1145 
1146  ret = CALL_MPI( MPI_Gatherv ) ( ( void* )inData,
1147  inElements,
1148  otf2_mpi_get_type( type ),
1149  outData,
1150  ( int* )outElements,
1151  displs,
1152  otf2_mpi_get_type( type ),
1153  root,
1154  commContext->comm );
1155 
1156  return MPI_SUCCESS == ret
1159 }
1160 
1161 
1162 static OTF2_CallbackCode
1163 otf2_mpi_collectives_scatter( void* userData,
1164  OTF2_CollectiveContext* commContext,
1165  const void* inData,
1166  void* outData,
1167  uint32_t numberElements,
1168  OTF2_Type type,
1169  uint32_t root )
1170 {
1171  ( void )userData;
1172 
1173  int ret = CALL_MPI( MPI_Scatter ) ( ( void* )inData,
1174  numberElements,
1175  otf2_mpi_get_type( type ),
1176  outData,
1177  numberElements,
1178  otf2_mpi_get_type( type ),
1179  root,
1180  commContext->comm );
1181 
1182  return MPI_SUCCESS == ret
1185 }
1186 
1187 
1188 static OTF2_CallbackCode
1189 otf2_mpi_collectives_scatterv( void* userData,
1190  OTF2_CollectiveContext* commContext,
1191  const void* inData,
1192  const uint32_t* inElements,
1193  void* outData,
1194  uint32_t outElements,
1195  OTF2_Type type,
1196  uint32_t root )
1197 {
1198  int* displs = NULL;
1199 
1200  ( void )userData;
1201 
1202  if ( ( int )root == commContext->rank )
1203  {
1204  int i;
1205  int displ = 0;
1206  for ( i = 0; i < commContext->rank; ++i )
1207  {
1208  commContext->displacements[ i ] = displ;
1209  displ += inElements[ i ];
1210  }
1211  displs = commContext->displacements;
1212  }
1213 
1214  int ret = CALL_MPI( MPI_Scatterv ) ( ( void* )inData,
1215  ( int* )inElements,
1216  displs,
1217  otf2_mpi_get_type( type ),
1218  outData,
1219  outElements,
1220  otf2_mpi_get_type( type ),
1221  root,
1222  commContext->comm );
1223 
1224  return MPI_SUCCESS == ret
1227 }
1228 
1229 
1230 static void
1231 otf2_mpi_get_collectives( OTF2_CollectiveCallbacks* collectiveCallbacks )
1232 {
1233  collectiveCallbacks->otf2_release = otf2_mpi_collectives_release;
1234  collectiveCallbacks->otf2_get_size = otf2_mpi_collectives_get_size;
1235  collectiveCallbacks->otf2_get_rank = otf2_mpi_collectives_get_rank;
1236  collectiveCallbacks->otf2_create_local_comm = otf2_mpi_collectives_create_local_comm;
1237  collectiveCallbacks->otf2_free_local_comm = otf2_mpi_collectives_free_local_comm;
1238  collectiveCallbacks->otf2_barrier = otf2_mpi_collectives_barrier;
1239  collectiveCallbacks->otf2_bcast = otf2_mpi_collectives_bcast;
1240  collectiveCallbacks->otf2_gather = otf2_mpi_collectives_gather;
1241  collectiveCallbacks->otf2_gatherv = otf2_mpi_collectives_gatherv;
1242  collectiveCallbacks->otf2_scatter = otf2_mpi_collectives_scatter;
1243  collectiveCallbacks->otf2_scatterv = otf2_mpi_collectives_scatterv;
1244 }
1245 
1246 
1247 #undef CALL_MPI
1248 #ifdef OTF2_MPI_USE_PMPI_undef_me
1249 #undef OTF2_MPI_USE_PMPI
1250 #undef OTF2_MPI_USE_PMPI_undef_me
1251 #endif
1252 
1253 
1259 #endif /* OTF2_MPI_COLLECTIVES_H */
Record reading can continue.
Definition: OTF2_GeneralDefinitions.h:347
Main include file for applications using OTF2.
uint8_t OTF2_Type
Wrapper for enum OTF2_Type_enum.
Definition: OTF2_GeneralDefinitions.h:561
Definition: OTF2_ErrorCodes.h:66
Definition: OTF2_ErrorCodes.h:231
Definition: OTF2_ErrorCodes.h:245
Struct which holds all collective callbacks.
Definition: OTF2_Callbacks.h:490
OTF2_ErrorCode OTF2_Reader_SetCollectiveCallbacks(OTF2_Reader *reader, const OTF2_CollectiveCallbacks *collectiveCallbacks, void *collectiveData, OTF2_CollectiveContext *globalCommContext, OTF2_CollectiveContext *localCommContext)
Set the collective callbacks for the reader.
OTF2_ErrorCode
Definition: OTF2_ErrorCodes.h:54
OTF2_CallbackCode
Return value to indicate that the record reading should be interrupted.
Definition: OTF2_GeneralDefinitions.h:344
Signaling an error in the callback.
Definition: OTF2_GeneralDefinitions.h:358
struct OTF2_Archive_struct OTF2_Archive
Keeps all meta-data for an OTF2 archive.
Definition: OTF2_Archive.h:210
struct OTF2_Reader_struct OTF2_Reader
Keeps all necessary information for the reader.
Definition: OTF2_Reader.h:190
static OTF2_ErrorCode OTF2_MPI_Archive_SetCollectiveCallbacks(OTF2_Archive *archive, MPI_Comm globalComm, MPI_Comm localComm)
Register a MPI collective context to an OTF2 archive.
static OTF2_ErrorCode OTF2_MPI_Reader_SetCollectiveCallbacks(OTF2_Reader *reader, MPI_Comm globalComm)
Register a MPI collective context to an OTF2 reader.
static OTF2_ErrorCode OTF2_MPI_Archive_SetCollectiveCallbacksSplit(OTF2_Archive *archive, MPI_Comm globalComm, uint32_t numberOfFiles)
Register a MPI collective context to an OTF2 archive.
struct OTF2_CollectiveContext OTF2_CollectiveContext
User provided type for collective groups.
Definition: OTF2_Callbacks.h:303
OTF2_ErrorCode OTF2_Archive_SetCollectiveCallbacks(OTF2_Archive *archive, const OTF2_CollectiveCallbacks *collectiveCallbacks, void *collectiveData, OTF2_CollectiveContext *globalCommContext, OTF2_CollectiveContext *localCommContext)
Set the collective callbacks for the archive.
Definition: OTF2_ErrorCodes.h:247