Skip to content

Commit 98dec89

Browse files
committed
WIP
1 parent cb5184f commit 98dec89

File tree

4 files changed

+31
-24
lines changed

4 files changed

+31
-24
lines changed

apps/nccl/src/allgather.cu

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,9 @@ std::shared_ptr<mscclpp::Algorithm> AllgatherAlgo6::build() {
116116
[self](std::shared_ptr<mscclpp::Communicator> comm) { self->initialize(comm); },
117117
[self](const std::shared_ptr<mscclpp::AlgorithmCtx> ctx, const void* input, void* output, size_t inputSize,
118118
[[maybe_unused]] size_t outputSize, [[maybe_unused]] mscclpp::DataType dtype, cudaStream_t stream,
119-
std::unordered_map<std::string, uintptr_t>& extras) {
120-
return self->allgatherKernelFunc(ctx, input, output, inputSize, stream, extras);
119+
std::unordered_map<std::string, uintptr_t>& extras) -> mscclpp::CommResult {
120+
ncclResult_t res = self->allgatherKernelFunc(ctx, input, output, inputSize, stream, extras);
121+
return res == ncclSuccess ? mscclpp::CommResult::commSuccess : mscclpp::CommResult::commInternalError;
121122
},
122123
[self](std::shared_ptr<mscclpp::Communicator> comm, const void* input, void* output, size_t inputSize,
123124
[[maybe_unused]] size_t outputSize,
@@ -195,8 +196,9 @@ std::shared_ptr<mscclpp::Algorithm> AllgatherAlgo8::build() {
195196
[self](std::shared_ptr<mscclpp::Communicator> comm) { self->initialize(comm); },
196197
[self](const std::shared_ptr<mscclpp::AlgorithmCtx> ctx, const void* input, void* output, size_t inputSize,
197198
[[maybe_unused]] size_t outputSize, [[maybe_unused]] mscclpp::DataType dtype, cudaStream_t stream,
198-
std::unordered_map<std::string, uintptr_t>& extras) {
199-
return self->allgatherKernelFunc(ctx, input, output, inputSize, stream, extras);
199+
std::unordered_map<std::string, uintptr_t>& extras) -> mscclpp::CommResult {
200+
ncclResult_t res = self->allgatherKernelFunc(ctx, input, output, inputSize, stream, extras);
201+
return res == ncclSuccess ? mscclpp::CommResult::commSuccess : mscclpp::CommResult::commInternalError;
200202
},
201203
[self](std::shared_ptr<mscclpp::Communicator> comm, const void* input, void* output, size_t inputSize,
202204
[[maybe_unused]] size_t outputSize,

apps/nccl/src/allreduce.cu

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -315,8 +315,9 @@ std::shared_ptr<mscclpp::Algorithm> AllreducePacket::build() {
315315
[self](std::shared_ptr<mscclpp::Communicator> comm) { self->initialize(comm); },
316316
[self](const std::shared_ptr<mscclpp::AlgorithmCtx> ctx, const void* input, void* output, size_t inputSize,
317317
[[maybe_unused]] size_t outputSize, mscclpp::DataType dtype, cudaStream_t stream,
318-
std::unordered_map<std::string, uintptr_t>& extras) {
319-
return self->allreduceKernelFunc(ctx, input, output, inputSize, dtype, stream, extras);
318+
std::unordered_map<std::string, uintptr_t>& extras) -> mscclpp::CommResult {
319+
ncclResult_t res = self->allreduceKernelFunc(ctx, input, output, inputSize, dtype, stream, extras);
320+
return res == ncclSuccess ? mscclpp::CommResult::commSuccess : mscclpp::CommResult::commInternalError;
320321
},
321322
[self](std::shared_ptr<mscclpp::Communicator> comm, const void* input, void* output, size_t inputSize,
322323
[[maybe_unused]] size_t outputSize,
@@ -409,8 +410,9 @@ std::shared_ptr<mscclpp::Algorithm> AllreduceNvls::build() {
409410
[self](std::shared_ptr<mscclpp::Communicator> comm) { self->initialize(comm); },
410411
[self](const std::shared_ptr<mscclpp::AlgorithmCtx> ctx, const void* input, void* output, size_t inputSize,
411412
[[maybe_unused]] size_t outputSize, mscclpp::DataType dtype, cudaStream_t stream,
412-
std::unordered_map<std::string, uintptr_t>& extras) {
413-
return self->allreduceKernelFunc(ctx, input, output, inputSize, dtype, stream, extras);
413+
std::unordered_map<std::string, uintptr_t>& extras) -> mscclpp::CommResult {
414+
ncclResult_t res = self->allreduceKernelFunc(ctx, input, output, inputSize, dtype, stream, extras);
415+
return res == ncclSuccess ? mscclpp::CommResult::commSuccess : mscclpp::CommResult::commInternalError;
414416
},
415417
[self](std::shared_ptr<mscclpp::Communicator> comm, const void* input, void* output, size_t inputSize,
416418
[[maybe_unused]] size_t outputSize,
@@ -478,8 +480,9 @@ std::shared_ptr<mscclpp::Algorithm> AllreduceNvlsWithCopy::build() {
478480
[self](std::shared_ptr<mscclpp::Communicator> comm) { self->initialize(comm); },
479481
[self](const std::shared_ptr<mscclpp::AlgorithmCtx> ctx, const void* input, void* output, size_t inputSize,
480482
[[maybe_unused]] size_t outputSize, mscclpp::DataType dtype, cudaStream_t stream,
481-
std::unordered_map<std::string, uintptr_t>& extras) {
482-
return self->allreduceKernelFunc(ctx, input, output, inputSize, dtype, stream, extras);
483+
std::unordered_map<std::string, uintptr_t>& extras) -> mscclpp::CommResult {
484+
ncclResult_t res = self->allreduceKernelFunc(ctx, input, output, inputSize, dtype, stream, extras);
485+
return res == ncclSuccess ? mscclpp::CommResult::commSuccess : mscclpp::CommResult::commInternalError;
483486
},
484487
[self](std::shared_ptr<mscclpp::Communicator> comm, const void* input, void* output, size_t inputSize,
485488
[[maybe_unused]] size_t outputSize,
@@ -580,8 +583,9 @@ std::shared_ptr<mscclpp::Algorithm> Allreduce8::build() {
580583
[self](std::shared_ptr<mscclpp::Communicator> comm) { self->initialize(comm); },
581584
[self](const std::shared_ptr<mscclpp::AlgorithmCtx> ctx, const void* input, void* output, size_t inputSize,
582585
[[maybe_unused]] size_t outputSize, mscclpp::DataType dtype, cudaStream_t stream,
583-
std::unordered_map<std::string, uintptr_t>& extras) {
584-
return self->allreduceKernelFunc(ctx, input, output, inputSize, dtype, stream, extras);
586+
std::unordered_map<std::string, uintptr_t>& extras) -> mscclpp::CommResult {
587+
ncclResult_t res = self->allreduceKernelFunc(ctx, input, output, inputSize, dtype, stream, extras);
588+
return res == ncclSuccess ? mscclpp::CommResult::commSuccess : mscclpp::CommResult::commInternalError;
585589
},
586590
[self](std::shared_ptr<mscclpp::Communicator> comm, const void* input, void* output, size_t inputSize,
587591
[[maybe_unused]] size_t outputSize,
@@ -649,7 +653,8 @@ std::shared_ptr<mscclpp::Algorithm> AllreduceNvlsPacket::build() {
649653
[self](const std::shared_ptr<mscclpp::AlgorithmCtx> ctx, const void* input, void* output, size_t inputSize,
650654
[[maybe_unused]] size_t outputSize, mscclpp::DataType dtype, cudaStream_t stream,
651655
std::unordered_map<std::string, uintptr_t>& extras) {
652-
return self->allreduceKernelFunc(ctx, input, output, inputSize, dtype, stream, extras);
656+
ncclResult_t res = self->allreduceKernelFunc(ctx, input, output, inputSize, dtype, stream, extras);
657+
return res == ncclSuccess ? mscclpp::CommResult::commSuccess : mscclpp::CommResult::commInternalError;
653658
},
654659
[self](std::shared_ptr<mscclpp::Communicator> comm, const void* input, void* output, size_t inputSize,
655660
[[maybe_unused]] size_t outputSize,

src/algorithms/allreduce/allreduce_allpair_packet.cu

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,15 @@ inline std::pair<int, int> getDefaultBlockNumAndThreadNum(size_t inputSize, int
7575
template <Op OpType, typename T>
7676
struct AllpairAdapter {
7777
static cudaError_t call(const void* buff, void* scratch, void* resultBuff, void* memoryChannels, void*,
78-
DeviceHandle<SwitchChannel>*, DeviceHandle<SwitchChannel>*, DeviceHandle<SwitchChannel>*,
79-
size_t channelInOffset, size_t, size_t scratchBufferSize, int rank, int nRanksPerNode,
80-
int worldSize, size_t inputSize, cudaStream_t stream, LL8Packet* flags,
81-
uint32_t numScratchBuff, int nBlocks = 0, int nThreadsPerBlock = 0) {
78+
DeviceHandle<SwitchChannel>*, DeviceHandle<SwitchChannel>*, size_t channelInOffset, size_t,
79+
size_t scratchBufferSize, int rank, int nRanksPerNode, int worldSize, size_t inputSize,
80+
cudaStream_t stream, LL8Packet* flags, uint32_t numScratchBuff, int nBlocks = 0,
81+
int nThreadsPerBlock = 0) {
8282
using ChannelType = DeviceHandle<MemoryChannel>;
8383
const size_t nelems = inputSize / sizeof(T);
8484
allreduceAllPairs<OpType><<<nBlocks, nThreadsPerBlock, 0, stream>>>(
8585
(T*)buff, (T*)scratch, (T*)resultBuff, (ChannelType*)memoryChannels, channelInOffset, scratchBufferSize, rank,
86-
nRanksPerNode, worldSize, nelems, flags, numScratchBuff);
86+
nRanksPerNode, worldSize, nelems, numScratchBuff, flags);
8787
return cudaGetLastError();
8888
}
8989
};
@@ -160,7 +160,7 @@ AlgorithmCtxKey AllreduceAllpairPacket::generateAllreduceContextKey(const void*
160160
}
161161

162162
std::shared_ptr<Algorithm> AllreduceAllpairPacket::build() {
163-
auto self = std::make_shared<AllreduceAllpairPacket>(scratchBuffer_, scratchBufferSize_);
163+
auto self = std::make_shared<AllreduceAllpairPacket>(reinterpret_cast<uintptr_t>(scratchBuffer_), scratchBufferSize_);
164164
return std::make_shared<NativeAlgorithm>(
165165
"default_allreduce_allpair_packet", "allreduce",
166166
[self](std::shared_ptr<Communicator> comm) { self->initialize(comm); },

src/algorithms/allreduce/allreduce_packet.cu

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,10 @@ __global__ void __launch_bounds__(1024, 1)
148148
template <Op OpType, typename T>
149149
struct PacketAdapter {
150150
static cudaError_t call(const void* buff, void* scratch, void* resultBuff, void* memoryChannels, void*,
151-
DeviceHandle<SwitchChannel>*, DeviceHandle<SwitchChannel>*, DeviceHandle<SwitchChannel>*,
152-
size_t channelInOffset, size_t, size_t scratchBufferSize, int rank, int nRanksPerNode,
153-
int worldSize, size_t inputSize, cudaStream_t stream, LL8Packet* flags,
154-
uint32_t numScratchBuff, int nBlocks = 0, int nThreadsPerBlock = 0) {
151+
DeviceHandle<SwitchChannel>*, DeviceHandle<SwitchChannel>*, size_t channelInOffset, size_t,
152+
size_t scratchBufferSize, int rank, int nRanksPerNode, int worldSize, size_t inputSize,
153+
cudaStream_t stream, LL8Packet* flags, uint32_t numScratchBuff, int nBlocks = 0,
154+
int nThreadsPerBlock = 0) {
155155
using ChannelType = DeviceHandle<MemoryChannel>;
156156
const size_t nelems = inputSize / sizeof(T);
157157
allreducePacket<OpType><<<nBlocks, nThreadsPerBlock, 0, stream>>>(
@@ -240,7 +240,7 @@ AlgorithmCtxKey AllreducePacket::generateAllreduceContextKey(const void* input,
240240
}
241241

242242
std::shared_ptr<Algorithm> AllreducePacket::build() {
243-
auto self = std::make_shared<AllreducePacket>(scratchBuffer_, scratchBufferSize_);
243+
auto self = std::make_shared<AllreducePacket>(reinterpret_cast<uintptr_t>(scratchBuffer_), scratchBufferSize_);
244244
return std::make_shared<NativeAlgorithm>(
245245
"default_allreduce_packet", "allreduce", [self](std::shared_ptr<Communicator> comm) { self->initialize(comm); },
246246
[self](const std::shared_ptr<AlgorithmCtx> ctx, const void* input, void* output, size_t inputSize,

0 commit comments

Comments
 (0)