Skip to content
99 changes: 57 additions & 42 deletions src/object/srv_obj_migrate.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
#endif

/* Max in-flight transfer size per xstream */
/* Set the total in-flight size to be 50% of MAX DMA size for
/* Set the total in-flight size to be 1/3 of MAX DMA size for
* the moment, will adjust it later if needed.
*/
#define MIGR_TGT_INF_DATA (1 << 29)
#define MIGR_TGT_INF_DATA (300 << 20)

/* Threshold for very large transfers.
* This may exceed the MIGR_TGT_INF_DATA limit to prevent starvation.
Expand All @@ -50,15 +50,12 @@
#define ENV_MIGRATE_ULT_CNT "D_MIGRATE_ULT_CNT"

/* Number of migration ULTs per target */
#define MIGR_TGT_ULTS_MIN 100
#define MIGR_TGT_ULTS_DEF 500
#define MIGR_TGT_ULTS_MAX 2000
#define MIGR_TGT_OBJS_MIN 64
#define MIGR_TGT_OBJS_DEF 128
#define MIGR_TGT_OBJS_MAX 512

/* 1/3 object ults, 2/3 key ULTs */
#define MIGR_OBJ_ULT_PERCENT 33

#define MIGR_TGT_OBJ_ULTS(ults) ((ults * MIGR_OBJ_ULT_PERCENT) / 100)
#define MIGR_TGT_KEY_ULTS(ults) (ults - MIGR_TGT_OBJ_ULTS(ults))
#define MIGR_TGT_OBJ_ULTS(ults) (ults)
#define MIGR_TGT_KEY_ULTS(ults) (ults * 2)

enum {
MIGR_OBJ = 0,
Expand Down Expand Up @@ -182,6 +179,7 @@
daos_unit_oid_t oid;
daos_handle_t ioa_oh;
int ioa_obj_ref;
unsigned int ioa_fanout;
struct daos_oclass_attr ioa_oca;
daos_epoch_t epoch;
daos_epoch_t punched_epoch;
Expand Down Expand Up @@ -710,16 +708,16 @@
D_WARN(DF_UUID" retry "DF_UOID" "DF_RC"\n",
DP_UUID(tls->mpt_pool_uuid), DP_UOID(mrone->mo_oid), DP_RC(rc));
if (rc == -DER_NOMEM) {
/* sleep 10 seconds before retry, give other layers a chance to
/* sleep a few seconds before retry, give other layers a chance to
* release resources.
*/
dss_sleep(10 * 1000);
dss_sleep((10 + rand() % 20) * 1000);
if (waited != 0 && waited % 3600 == 0) {
DL_ERROR(rc, DF_RB ": waited for memory for %d hour(s)",
DP_RB_MPT(tls), waited / 3600);
}
}
waited += 10;
waited += 20;
D_GOTO(retry, rc);
}

Expand Down Expand Up @@ -1825,6 +1823,7 @@

D_ASSERT(d_list_empty(&mrone->mo_list));
daos_iov_free(&mrone->mo_dkey);
daos_iov_free(&mrone->mo_csum_iov);

if (mrone->mo_iods_update_ephs) {
for (i = 0; i < mrone->mo_iod_alloc_num; i++) {
Expand Down Expand Up @@ -1932,12 +1931,16 @@
*yielded = waited;

/* per-pool counters for rebuild status tracking */
if (res_type == MIGR_OBJ)
if (res_type == MIGR_OBJ) {
tls->mpt_tgt_obj_ult_cnt++;
else if (res_type == MIGR_KEY)
} else if (res_type == MIGR_KEY) {
tls->mpt_tgt_dkey_ult_cnt++;
else
} else {
tls->mpt_inflight_size += units;
/* remaining resource may be sufficient for more waiters */
if (waited && res->res_units < res->res_limit)
ABT_cond_signal(res->res_cond);
}

D_DEBUG(DB_REBUILD,
"res=%s, hold=%lu, used=%lu, limit=%lu, waited=%d)\n" DF_RB
Expand Down Expand Up @@ -1995,8 +1998,9 @@
migrate_one_ult(void *arg)
{
struct migrate_one *mrone = arg;
struct iter_obj_arg *ioa = mrone->mo_obj_arg;
struct migrate_pool_tls *tls;
daos_size_t data_size;
daos_size_t data_size;
int rc = 0;

while (daos_fail_check(DAOS_REBUILD_TGT_REBUILD_HANG))
Expand All @@ -2009,21 +2013,20 @@
}

data_size = daos_iods_len(mrone->mo_iods, mrone->mo_iod_num);
data_size += daos_iods_len(mrone->mo_iods_from_parity,
mrone->mo_iods_num_from_parity);
data_size += daos_iods_len(mrone->mo_iods_from_parity, mrone->mo_iods_num_from_parity);

D_DEBUG(DB_TRACE, "mrone %p data size is "DF_U64" %d/%d\n",
mrone, data_size, mrone->mo_iod_num, mrone->mo_iods_num_from_parity);

D_ASSERT(data_size != (daos_size_t)-1);

rc = migrate_res_hold(tls, MIGR_DATA, data_size, NULL);
rc = migrate_res_hold(tls, MIGR_DATA, data_size * ioa->ioa_fanout, NULL);
if (rc)
D_GOTO(out, rc);

rc = migrate_dkey(tls, mrone, data_size);

migrate_res_release(tls, MIGR_DATA, data_size);
migrate_res_release(tls, MIGR_DATA, data_size * ioa->ioa_fanout);

D_DEBUG(DB_REBUILD, DF_UOID" layout %u migrate dkey "DF_KEY" inflight_size "DF_U64": "
DF_RC"\n", DP_UOID(mrone->mo_oid), mrone->mo_oid.id_layout_ver,
Expand All @@ -2045,7 +2048,7 @@
tls->mpt_fini = 1;
}
out:
migrate_res_release(tls, MIGR_KEY, 1);
migrate_res_release(tls, MIGR_KEY, ioa->ioa_fanout);
migrate_one_destroy(mrone);
}

Expand Down Expand Up @@ -2811,7 +2814,7 @@
continue;
}

rc = migrate_res_hold(tls, MIGR_KEY, 1, NULL);
rc = migrate_res_hold(tls, MIGR_KEY, arg->ioa_fanout, NULL);
if (rc)
break;
d_list_del_init(&mrone->mo_list);
Expand All @@ -2825,7 +2828,7 @@
rc = dss_ult_create(migrate_one_ult, mrone, DSS_XS_SELF, 0, MIGRATE_STACK_SIZE,
NULL);
if (rc) {
migrate_res_release(tls, MIGR_KEY, 1);
migrate_res_release(tls, MIGR_KEY, arg->ioa_fanout);
migrate_one_destroy(mrone);
break;
}
Expand Down Expand Up @@ -3305,7 +3308,7 @@
tls->mpt_status = rc;

free_notls:
migrate_res_release(tls, MIGR_OBJ, 1);
migrate_res_release(tls, MIGR_OBJ, arg->ioa_fanout);
migrate_obj_put(arg);
}

Expand All @@ -3319,10 +3322,11 @@
/* This is still running on the main migration ULT */
static int
migrate_one_object(daos_unit_oid_t oid, daos_epoch_t eph, daos_epoch_t punched_eph,
unsigned int shard, unsigned int tgt_idx, void *data)
unsigned int shard, unsigned int tgt_idx, void *data, bool *yielded)
{
struct iter_cont_arg *cont_arg = data;
struct iter_obj_arg *obj_arg;
struct daos_oclass_attr *oc;
struct migrate_pool_tls *tls = cont_arg->pool_tls;
daos_handle_t toh = tls->mpt_migrated_root_hdl;
struct migrate_obj_val val;
Expand All @@ -3348,6 +3352,17 @@
uuid_copy(obj_arg->cont_uuid, cont_arg->cont_uuid);
obj_arg->version = cont_arg->pool_tls->mpt_version;
obj_arg->generation = cont_arg->pool_tls->mpt_generation;
oc = daos_oclass_attr_find(oid.id_pub, NULL);
if (!oc) {
rc = -DER_NOSCHEMA;
DL_ERROR(rc, DF_RB " invalid object ID\n", DP_RB_MPT(tls));

Check warning on line 3358 in src/object/srv_obj_migrate.c

View workflow job for this annotation

GitHub Actions / Logging macro checking

check-return, Line contains too many newlines
goto free;
}
if (daos_oclass_is_ec(oc))
obj_arg->ioa_fanout = MIN(16, obj_ec_data_tgt_nr(oc));
else
obj_arg->ioa_fanout = 1;

if (cont_arg->snaps) {
D_ALLOC(obj_arg->snaps,
sizeof(*cont_arg->snaps) * cont_arg->snap_cnt);
Expand All @@ -3359,10 +3374,18 @@
sizeof(*obj_arg->snaps) * cont_arg->snap_cnt);
}

rc = migrate_res_hold(tls, MIGR_OBJ, obj_arg->ioa_fanout, yielded);
if (rc != 0) {
DL_ERROR(rc, DF_UUID " enter migrate failed.", DP_UUID(tls->mpt_pool_uuid));
goto free;
}

D_ASSERT(tgt_idx == dss_get_module_info()->dmi_tgt_id);
rc = dss_ult_create(migrate_obj_ult, obj_arg, DSS_XS_SELF, 0, MIGRATE_STACK_SIZE, NULL);
if (rc)
if (rc) {
migrate_res_release(tls, MIGR_OBJ, obj_arg->ioa_fanout);
goto free;
}

val.epoch = eph;
val.shard = shard;
Expand Down Expand Up @@ -3405,17 +3428,9 @@
" eph "DF_U64" start\n", DP_UUID(arg->cont_uuid), DP_UOID(*oid),
ih.cookie, epoch);

rc = migrate_res_hold(arg->pool_tls, MIGR_OBJ, 1, &yielded);
if (rc != 0) {
DL_ERROR(rc, DF_UUID" enter migrate failed.", DP_UUID(arg->cont_uuid));
return rc;
}

rc = migrate_one_object(*oid, epoch, punched_epoch, shard, tgt_idx, arg);
rc = migrate_one_object(*oid, epoch, punched_epoch, shard, tgt_idx, arg, &yielded);
if (rc != 0) {
D_ERROR("obj "DF_UOID" migration failed: "DF_RC"\n",
DP_UOID(*oid), DP_RC(rc));
migrate_res_release(arg->pool_tls, MIGR_OBJ, 1);
D_ERROR("obj " DF_UOID " migration failed: " DF_RC "\n", DP_UOID(*oid), DP_RC(rc));
return rc;
}

Expand Down Expand Up @@ -4148,18 +4163,18 @@
int
obj_migrate_init(void)
{
unsigned int ults = MIGR_TGT_ULTS_DEF;
unsigned int ults = MIGR_TGT_OBJS_DEF;
int i;
int rc = 0;

D_CASSERT(MIGR_TGT_INF_DATA > MIGR_INF_DATA_LWM);
D_CASSERT(MIGR_TGT_INF_DATA > MIGR_INF_DATA_HULK);

d_getenv_uint(ENV_MIGRATE_ULT_CNT, &ults);
if (ults < MIGR_TGT_ULTS_MIN)
ults = MIGR_TGT_ULTS_MIN;
if (ults > MIGR_TGT_ULTS_MAX)
ults = MIGR_TGT_ULTS_MAX;
if (ults < MIGR_TGT_OBJS_MIN)
ults = MIGR_TGT_OBJS_MIN;
if (ults > MIGR_TGT_OBJS_MAX)
ults = MIGR_TGT_OBJS_MAX;

memset(&migr_eng_res, 0, sizeof(migr_eng_res));
migr_eng_res.er_max_ults = ults;
Expand Down
Loading