Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/tutorials/04-port-channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ If your bootstrap IP address is not on the default network interface of your nod

### What's Happening in Terms of InfiniBand?

When we use InfiniBand transport, each `Endpoint` holds a unique InfiniBand context, and each `Connection` holds a unique InfiniBand queue pair (QP). Therefore, multiple `Semaphore`s and `PortChannel`s will share the same QP if they are created out of the same `Connection`. If you want multiple QPs between two endpoints, you need to create multiple parallel `Connection`s, and then create `Semaphore`s and `PortChannel`s from different `Connection`s.
When we use InfiniBand transport, each `Connection` holds a unique InfiniBand queue pair (QP). Therefore, multiple `Semaphore`s and `PortChannel`s will share the same QP if they are created out of the same `Connection`. If you want multiple QPs between two endpoints, you need to create multiple parallel `Connection`s, and then create `Semaphore`s and `PortChannel`s from different `Connection`s.

The `PortChannel` methods would have the following behavior in terms of InfiniBand operations:
- `put()`: Posts an RDMA Write operation to the QP to transfer data.
Expand Down
23 changes: 20 additions & 3 deletions include/mscclpp/core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,11 +381,19 @@ struct EndpointConfig {
/// These settings are only used when the transport is an InfiniBand type (IB0-IB7); they are ignored for other
/// transports.
struct Ib {
static const int DefaultPort = -1;
static const int DefaultGidIndex = 0;
static const int DefaultMaxCqSize = 1024;
static const int DefaultMaxCqPollNum = 1;
static const int DefaultMaxSendWr = 8192;
static const int DefaultMaxWrPerSend = 64;

/// Device index. Currently ignored; use transport type (IB0-IB7) to select device.
int deviceIndex;
/// Port number.
int port;
/// GID index.
int gidIndex;
/// Maximum size of the completion queue.
int maxCqSize;
/// Maximum number of completion queue polls per operation.
Expand All @@ -396,13 +404,22 @@ struct EndpointConfig {
int maxWrPerSend;

/// Constructor.
/// @param deviceIndex Device index.
/// @param port Port number.
/// @param gidIndex GID index.
/// @param maxCqSize Maximum completion queue size.
/// @param maxCqPollNum Maximum completion queue poll count.
/// @param maxSendWr Maximum outstanding send work requests.
/// @param maxWrPerSend Maximum work requests per send operation.
Ib(int maxCqSize = DefaultMaxCqSize, int maxCqPollNum = DefaultMaxCqPollNum, int maxSendWr = DefaultMaxSendWr,
int maxWrPerSend = DefaultMaxWrPerSend)
: maxCqSize(maxCqSize), maxCqPollNum(maxCqPollNum), maxSendWr(maxSendWr), maxWrPerSend(maxWrPerSend) {}
Ib(int deviceIndex = -1, int port = DefaultPort, int gidIndex = DefaultGidIndex, int maxCqSize = DefaultMaxCqSize,
int maxCqPollNum = DefaultMaxCqPollNum, int maxSendWr = DefaultMaxSendWr, int maxWrPerSend = DefaultMaxWrPerSend)
: deviceIndex(deviceIndex),
port(port),
gidIndex(gidIndex),
maxCqSize(maxCqSize),
maxCqPollNum(maxCqPollNum),
maxSendWr(maxSendWr),
maxWrPerSend(maxWrPerSend) {}
};

/// Communication transport type (e.g., CudaIpc, IB0-IB7, Ethernet).
Expand Down
4 changes: 4 additions & 0 deletions include/mscclpp/env.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ class Env {
/// devices automatically.
const std::string hcaDevices;

/// Env name: `MSCCLPP_IBV_SO`. The path to the libibverbs shared library to use. If unset, it will use the
/// default libibverbs library found in the system.
const std::string ibvSo;

/// Env name: `MSCCLPP_HOSTID`. A string that uniquely identifies the host. If unset, it will use the hostname.
/// This is used to determine whether the host is the same across different processes.
const std::string hostid;
Expand Down
17 changes: 16 additions & 1 deletion python/csrc/core_py.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,16 @@ void register_core(nb::module_& m) {

nb::class_<EndpointConfig::Ib>(m, "EndpointConfigIb")
.def(nb::init<>())
.def(nb::init<int, int, int, int>(), nb::arg("max_cq_size") = EndpointConfig::Ib::DefaultMaxCqSize,
.def(nb::init<int, int, int, int, int, int, int>(), nb::arg("device_index") = -1,
nb::arg("port") = EndpointConfig::Ib::DefaultPort,
nb::arg("gid_index") = EndpointConfig::Ib::DefaultGidIndex,
nb::arg("max_cq_size") = EndpointConfig::Ib::DefaultMaxCqSize,
nb::arg("max_cq_poll_num") = EndpointConfig::Ib::DefaultMaxCqPollNum,
nb::arg("max_send_wr") = EndpointConfig::Ib::DefaultMaxSendWr,
nb::arg("max_wr_per_send") = EndpointConfig::Ib::DefaultMaxWrPerSend)
.def_rw("device_index", &EndpointConfig::Ib::deviceIndex)
.def_rw("port", &EndpointConfig::Ib::port)
.def_rw("gid_index", &EndpointConfig::Ib::gidIndex)
.def_rw("max_cq_size", &EndpointConfig::Ib::maxCqSize)
.def_rw("max_cq_poll_num", &EndpointConfig::Ib::maxCqPollNum)
.def_rw("max_send_wr", &EndpointConfig::Ib::maxSendWr)
Expand Down Expand Up @@ -176,6 +182,15 @@ void register_core(nb::module_& m) {
.def_rw("transport", &EndpointConfig::transport)
.def_rw("device", &EndpointConfig::device)
.def_rw("ib", &EndpointConfig::ib)
.def_prop_rw(
"ib_device_index", [](EndpointConfig& self) { return self.ib.deviceIndex; },
[](EndpointConfig& self, int v) { self.ib.deviceIndex = v; })
.def_prop_rw(
"ib_port", [](EndpointConfig& self) { return self.ib.port; },
[](EndpointConfig& self, int v) { self.ib.port = v; })
.def_prop_rw(
"ib_gid_index", [](EndpointConfig& self) { return self.ib.gidIndex; },
[](EndpointConfig& self, int v) { self.ib.gidIndex = v; })
Comment on lines +185 to +193
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming convention is inconsistent: the C++ struct uses camelCase (deviceIndex, gidIndex) but the Python property names use snake_case (ib_device_index, ib_gid_index). While the def_rw attributes correctly use camelCase to match the C++ members, the def_prop_rw properties add an "ib_" prefix and use snake_case. This creates two different ways to access the same fields in Python, which could be confusing. Consider using a consistent naming convention.

Copilot uses AI. Check for mistakes.
.def_prop_rw(
"ib_max_cq_size", [](EndpointConfig& self) { return self.ib.maxCqSize; },
[](EndpointConfig& self, int v) { self.ib.maxCqSize = v; })
Expand Down
6 changes: 3 additions & 3 deletions python/test/_cpp/proxy_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ class MyProxyService {
proxy_([&](mscclpp::ProxyTrigger triggerRaw) { return handleTrigger(triggerRaw); }) {
allRegMem_.reserve(allRegMemList.size());
for (size_t i = 0; i < allRegMemList.size(); ++i) {
auto& regMem = nb::cast<const mscclpp::RegisteredMemory&>(allRegMemList[i]);
allRegMem_.push_back(regMem);
auto regMem = nb::cast<mscclpp::RegisteredMemory>(allRegMemList[i]);
allRegMem_.emplace_back(regMem);
}
semaphores_.reserve(semaphoreList.size());
for (size_t i = 0; i < semaphoreList.size(); ++i) {
auto& sema = nb::cast<const mscclpp::Semaphore&>(semaphoreList[i]);
auto sema = nb::cast<mscclpp::Semaphore>(semaphoreList[i]);
semaphores_.emplace_back(sema);
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ Endpoint::Impl::Impl(const EndpointConfig& config, Context::Impl& contextImpl)
if (config_.maxWriteQueueSize <= 0) {
config_.maxWriteQueueSize = config_.ib.maxCqSize;
}
ibQp_ =
contextImpl.getIbContext(config_.transport)
->createQp(config_.ib.maxCqSize, config_.ib.maxCqPollNum, config_.ib.maxSendWr, 0, config_.ib.maxWrPerSend);
ibQp_ = contextImpl.getIbContext(config_.transport)
->createQp(config_.ib.port, config_.ib.gidIndex, config_.ib.maxCqSize, config_.ib.maxCqPollNum,
config_.ib.maxSendWr, 0, config_.ib.maxWrPerSend);
ibQpInfo_ = ibQp_->getInfo();
} else if (config_.transport == Transport::Ethernet) {
// Configuring Ethernet Interfaces
Expand Down
5 changes: 4 additions & 1 deletion src/env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ T readEnv(const std::string& envName, const T& defaultValue) {
return atoi(envCstr);
} else if constexpr (std::is_same_v<T, bool>) {
return (std::string(envCstr) != "0");
} else {
return T(envCstr);
}
return T(envCstr);
}

template <typename T>
Expand Down Expand Up @@ -52,6 +53,7 @@ Env::Env()
logSubsys(readEnv<std::string>("MSCCLPP_LOG_SUBSYS", "ALL")),
logFile(readEnv<std::string>("MSCCLPP_LOG_FILE", "")),
hcaDevices(readEnv<std::string>("MSCCLPP_HCA_DEVICES", "")),
ibvSo(readEnv<std::string>("MSCCLPP_IBV_SO", "")),
hostid(readEnv<std::string>("MSCCLPP_HOSTID", "")),
socketFamily(readEnv<std::string>("MSCCLPP_SOCKET_FAMILY", "")),
socketIfname(readEnv<std::string>("MSCCLPP_SOCKET_IFNAME", "")),
Expand All @@ -78,6 +80,7 @@ std::shared_ptr<Env> env() {
logEnv("MSCCLPP_LOG_SUBSYS", globalEnv->logSubsys);
logEnv("MSCCLPP_LOG_FILE", globalEnv->logFile);
logEnv("MSCCLPP_HCA_DEVICES", globalEnv->hcaDevices);
logEnv("MSCCLPP_IBV_SO", globalEnv->ibvSo);
logEnv("MSCCLPP_HOSTID", globalEnv->hostid);
logEnv("MSCCLPP_SOCKET_FAMILY", globalEnv->socketFamily);
logEnv("MSCCLPP_SOCKET_IFNAME", globalEnv->socketIfname);
Expand Down
64 changes: 34 additions & 30 deletions src/ib.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,11 @@ const void* IbMr::getBuff() const { return buff_; }

uint32_t IbMr::getLkey() const { return mr_->lkey; }

IbQp::IbQp(ibv_context* ctx, ibv_pd* pd, int port, int maxCqSize, int maxCqPollNum, int maxSendWr, int maxRecvWr,
int maxWrPerSend)
: info_(),
IbQp::IbQp(ibv_context* ctx, ibv_pd* pd, int portNum, int gidIndex, int maxCqSize, int maxCqPollNum, int maxSendWr,
int maxRecvWr, int maxWrPerSend)
: portNum_(portNum),
gidIndex_(gidIndex),
info_(),
qp_(nullptr),
cq_(nullptr),
wcs_(),
Expand Down Expand Up @@ -164,25 +166,25 @@ IbQp::IbQp(ibv_context* ctx, ibv_pd* pd, int port, int maxCqSize, int maxCqPollN
}

struct ibv_port_attr portAttr;
if (IBVerbs::ibv_query_port(ctx, port, &portAttr) != 0) {
if (IBVerbs::ibv_query_port(ctx, portNum_, &portAttr) != 0) {
THROW(NET, IbError, errno, "ibv_query_port failed (errno ", errno, ")");
}
info_.lid = portAttr.lid;
info_.port = port;
info_.linkLayer = portAttr.link_layer;
info_.qpn = qp->qp_num;
info_.mtu = portAttr.active_mtu;
info_.is_grh = (portAttr.flags & IBV_QPF_GRH_REQUIRED);

if (portAttr.link_layer != IBV_LINK_LAYER_INFINIBAND || info_.is_grh) {
// portAttr.gid_tbl_len contains the number of valid GID entries
if (portAttr.gid_tbl_len == 0) {
THROW(NET, Error, ErrorCode::SystemError, "No GID table entries available for port ", port);
if (gidIndex_ >= portAttr.gid_tbl_len) {
THROW(NET, Error, ErrorCode::InvalidUsage, "invalid GID index ", gidIndex_, " for port ", portNum_,
" (max index is ", portAttr.gid_tbl_len - 1, ")");
}

union ibv_gid gid = {};
if (IBVerbs::ibv_query_gid(ctx, port, 0, &gid) != 0) {
THROW(NET, IbError, errno, "ibv_query_gid failed for port ", port, " index 0 (errno ", errno, ")");
if (IBVerbs::ibv_query_gid(ctx, portNum_, gidIndex_, &gid) != 0) {
THROW(NET, IbError, errno, "ibv_query_gid failed for port ", portNum_, " index ", gidIndex_, " (errno ", errno,
")");
}
info_.spn = gid.global.subnet_prefix;
info_.iid = gid.global.interface_id;
Expand All @@ -191,7 +193,7 @@ IbQp::IbQp(ibv_context* ctx, ibv_pd* pd, int port, int maxCqSize, int maxCqPollN
struct ibv_qp_attr qpAttr = {};
qpAttr.qp_state = IBV_QPS_INIT;
qpAttr.pkey_index = 0;
qpAttr.port_num = port;
qpAttr.port_num = portNum_;
qpAttr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_ATOMIC;
if (IBVerbs::ibv_modify_qp(qp, &qpAttr, IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS) != 0) {
THROW(NET, IbError, errno, "ibv_modify_qp failed (errno ", errno, ")");
Expand Down Expand Up @@ -220,7 +222,7 @@ void IbQp::rtr(const IbQpInfo& info) {
qp_attr.ah_attr.grh.dgid.global.subnet_prefix = info.spn;
qp_attr.ah_attr.grh.dgid.global.interface_id = info.iid;
qp_attr.ah_attr.grh.flow_label = 0;
qp_attr.ah_attr.grh.sgid_index = 0;
qp_attr.ah_attr.grh.sgid_index = gidIndex_;
qp_attr.ah_attr.grh.hop_limit = 255;
qp_attr.ah_attr.grh.traffic_class = 0;
} else {
Expand All @@ -229,7 +231,7 @@ void IbQp::rtr(const IbQpInfo& info) {
qp_attr.ah_attr.dlid = info.lid;
qp_attr.ah_attr.sl = 0;
qp_attr.ah_attr.src_path_bits = 0;
qp_attr.ah_attr.port_num = info.port;
qp_attr.ah_attr.port_num = portNum_;
int ret = IBVerbs::ibv_modify_qp(qp_, &qp_attr,
IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN |
IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER);
Expand Down Expand Up @@ -374,7 +376,7 @@ IbCtx::~IbCtx() {
}
}

bool IbCtx::isPortUsable(int port) const {
bool IbCtx::isPortUsable(int port, int gidIndex) const {
struct ibv_port_attr portAttr = {};
if (IBVerbs::ibv_query_port(ctx_, port, &portAttr) != 0) {
THROW(NET, IbError, errno, "ibv_query_port failed (errno ", errno, ", port ", port, ")");
Expand All @@ -388,45 +390,47 @@ bool IbCtx::isPortUsable(int port) const {
return false;
}

// For Ethernet/RoCE or InfiniBand with GRH, check if GID table has entries
if (portAttr.link_layer == IBV_LINK_LAYER_ETHERNET || (portAttr.flags & IBV_QPF_GRH_REQUIRED)) {
if (portAttr.gid_tbl_len == 0) {
return false;
}
// Verify that at least one GID entry is queryable
union ibv_gid gid = {};
if (IBVerbs::ibv_query_gid(ctx_, port, 0, &gid) != 0) {
return false;
if (gidIndex >= 0) {
// For Ethernet/RoCE or InfiniBand with GRH, check if GID table has entries
if (portAttr.link_layer == IBV_LINK_LAYER_ETHERNET || (portAttr.flags & IBV_QPF_GRH_REQUIRED)) {
if (gidIndex >= portAttr.gid_tbl_len) {
return false;
}
union ibv_gid gid = {};
if (IBVerbs::ibv_query_gid(ctx_, port, gidIndex, &gid) != 0) {
return false;
}
}
}

return true;
}

int IbCtx::getAnyUsablePort() const {
int IbCtx::getAnyUsablePort(int gidIndex) const {
struct ibv_device_attr devAttr;
if (IBVerbs::ibv_query_device(ctx_, &devAttr) != 0) {
THROW(NET, IbError, errno, "ibv_query_device failed (errno ", errno, ")");
}
for (uint8_t port = 1; port <= devAttr.phys_port_cnt; ++port) {
if (this->isPortUsable(port)) {
if (this->isPortUsable(port, gidIndex)) {
return port;
}
}
return -1;
}

std::shared_ptr<IbQp> IbCtx::createQp(int maxCqSize, int maxCqPollNum, int maxSendWr, int maxRecvWr, int maxWrPerSend,
int port /*=-1*/) {
std::shared_ptr<IbQp> IbCtx::createQp(int port, int gidIndex, int maxCqSize, int maxCqPollNum, int maxSendWr,
int maxRecvWr, int maxWrPerSend) {
if (port == -1) {
port = this->getAnyUsablePort();
port = this->getAnyUsablePort(gidIndex);
if (port == -1) {
THROW(NET, Error, ErrorCode::InvalidUsage, "No usable port found (device: ", devName_, ")");
}
} else if (!this->isPortUsable(port)) {
} else if (!this->isPortUsable(port, gidIndex)) {
THROW(NET, Error, ErrorCode::InvalidUsage, "invalid IB port: ", port);
}
return std::shared_ptr<IbQp>(new IbQp(ctx_, pd_, port, maxCqSize, maxCqPollNum, maxSendWr, maxRecvWr, maxWrPerSend));
return std::shared_ptr<IbQp>(
new IbQp(ctx_, pd_, port, gidIndex, maxCqSize, maxCqPollNum, maxSendWr, maxRecvWr, maxWrPerSend));
}

std::unique_ptr<const IbMr> IbCtx::registerMr(void* buff, std::size_t size) {
Expand Down
16 changes: 12 additions & 4 deletions src/ibverbs_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <dlfcn.h>

#include <memory>
#include <mscclpp/env.hpp>

#include "logger.hpp"

Expand All @@ -15,12 +16,19 @@ static std::unique_ptr<void, int (*)(void*)> globalIBVerbsHandle(nullptr, &::dlc

void* IBVerbs::dlsym(const std::string& symbol, bool allowReturnNull) {
if (!globalIBVerbsHandle) {
const char* possibleLibNames[] = {"libibverbs.so", "libibverbs.so.1", nullptr};
for (int i = 0; possibleLibNames[i] != nullptr; i++) {
void* handle = ::dlopen(possibleLibNames[i], RTLD_NOW);
if (mscclpp::env()->ibvSo != "") {
void* handle = ::dlopen(mscclpp::env()->ibvSo.c_str(), RTLD_NOW);
if (handle) {
globalIBVerbsHandle.reset(handle);
Copy link

Copilot AI Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a custom libibverbs library is specified via MSCCLPP_IBV_SO but fails to load, the code silently falls through to the default library search without any warning or error. This could lead to confusion when the user expects a specific library to be used but a different one is loaded instead. Consider logging a warning or error message if the custom library path is specified but fails to load.

Suggested change
globalIBVerbsHandle.reset(handle);
globalIBVerbsHandle.reset(handle);
} else {
WARN(NET, "Failed to load custom libibverbs library specified by MSCCLPP_IBV_SO ('", mscclpp::env()->ibvSo, "'): ", std::string(::dlerror()));

Copilot uses AI. Check for mistakes.
break;
}
} else {
const char* possibleLibNames[] = {"libibverbs.so", "libibverbs.so.1", nullptr};
for (int i = 0; possibleLibNames[i] != nullptr; i++) {
void* handle = ::dlopen(possibleLibNames[i], RTLD_NOW);
if (handle) {
globalIBVerbsHandle.reset(handle);
break;
}
}
}
if (!globalIBVerbsHandle) {
Expand Down
22 changes: 10 additions & 12 deletions src/include/ib.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class IbMr {
// QP info to be shared with the remote peer
struct IbQpInfo {
uint16_t lid;
uint8_t port;
uint8_t linkLayer;
uint32_t qpn;
uint64_t spn;
Expand Down Expand Up @@ -91,10 +90,13 @@ class IbQp {
ibv_sge* sge;
};

IbQp(ibv_context* ctx, ibv_pd* pd, int port, int maxCqSize, int maxCqPollNum, int maxSendWr, int maxRecvWr,
int maxWrPerSend);
IbQp(ibv_context* ctx, ibv_pd* pd, int portNum, int gidIndex, int maxCqSize, int maxCqPollNum, int maxSendWr,
int maxRecvWr, int maxWrPerSend);
WrInfo getNewWrInfo();

int portNum_;
int gidIndex_;

IbQpInfo info_;

ibv_qp* qp_;
Expand All @@ -118,18 +120,14 @@ class IbCtx {
IbCtx(const std::string& devName);
~IbCtx();

std::shared_ptr<IbQp> createQp(int maxCqSize, int maxCqPollNum, int maxSendWr, int maxRecvWr, int maxWrPerSend,
int port = -1);
std::shared_ptr<IbQp> createQp(int port, int gidIndex, int maxCqSize, int maxCqPollNum, int maxSendWr, int maxRecvWr,
int maxWrPerSend);
std::unique_ptr<const IbMr> registerMr(void* buff, std::size_t size);
#else
IbCtx([[maybe_unused]] const std::string& devName) {}
~IbCtx() {}

std::shared_ptr<IbQp> createQp([[maybe_unused]] int maxCqSize, [[maybe_unused]] int maxCqPollNum,
[[maybe_unused]] int maxSendWr, [[maybe_unused]] int maxRecvWr,
[[maybe_unused]] int maxWrPerSend, [[maybe_unused]] int port = -1) {
return nullptr;
}
std::shared_ptr<IbQp> createQp(int, int, int, int, int, int, int) { return nullptr; }
std::unique_ptr<const IbMr> registerMr([[maybe_unused]] void* buff, [[maybe_unused]] std::size_t size) {
return nullptr;
}
Expand All @@ -138,8 +136,8 @@ class IbCtx {
const std::string& getDevName() const { return devName_; };

private:
bool isPortUsable(int port) const;
int getAnyUsablePort() const;
bool isPortUsable(int port, int gidIndex) const;
int getAnyUsablePort(int gidIndex) const;

const std::string devName_;
ibv_context* ctx_;
Expand Down
Loading
Loading