diff --git a/src/object/srv_obj_migrate.c b/src/object/srv_obj_migrate.c index 89367adeb4b..39261a40dfa 100644 --- a/src/object/srv_obj_migrate.c +++ b/src/object/srv_obj_migrate.c @@ -32,10 +32,10 @@ #endif /* Max in-flight transfer size per xstream */ -/* Set the total in-flight size to be 50% of MAX DMA size for +/* Set the total in-flight size to be 1/3 of MAX DMA size for * the moment, will adjust it later if needed. */ -#define MIGR_TGT_INF_DATA (1 << 29) +#define MIGR_TGT_INF_DATA (300 << 20) /* Threshold for very large transfers. * This may exceed the MIGR_TGT_INF_DATA limit to prevent starvation. @@ -50,15 +50,12 @@ #define ENV_MIGRATE_ULT_CNT "D_MIGRATE_ULT_CNT" /* Number of migration ULTs per target */ -#define MIGR_TGT_ULTS_MIN 100 -#define MIGR_TGT_ULTS_DEF 500 -#define MIGR_TGT_ULTS_MAX 2000 +#define MIGR_TGT_OBJS_MIN 64 +#define MIGR_TGT_OBJS_DEF 128 +#define MIGR_TGT_OBJS_MAX 512 -/* 1/3 object ults, 2/3 key ULTs */ -#define MIGR_OBJ_ULT_PERCENT 33 - -#define MIGR_TGT_OBJ_ULTS(ults) ((ults * MIGR_OBJ_ULT_PERCENT) / 100) -#define MIGR_TGT_KEY_ULTS(ults) (ults - MIGR_TGT_OBJ_ULTS(ults)) +#define MIGR_TGT_OBJ_ULTS(ults) (ults) +#define MIGR_TGT_KEY_ULTS(ults) (ults * 2) enum { MIGR_OBJ = 0, @@ -182,6 +179,7 @@ struct iter_obj_arg { daos_unit_oid_t oid; daos_handle_t ioa_oh; int ioa_obj_ref; + unsigned int ioa_fanout; struct daos_oclass_attr ioa_oca; daos_epoch_t epoch; daos_epoch_t punched_epoch; @@ -710,16 +708,16 @@ mrone_obj_fetch_internal(struct migrate_one *mrone, daos_handle_t oh, d_sg_list_ D_WARN(DF_UUID" retry "DF_UOID" "DF_RC"\n", DP_UUID(tls->mpt_pool_uuid), DP_UOID(mrone->mo_oid), DP_RC(rc)); if (rc == -DER_NOMEM) { - /* sleep 10 seconds before retry, give other layers a chance to + /* sleep a few seconds before retry, give other layers a chance to * release resources. */ - dss_sleep(10 * 1000); + dss_sleep((10 + rand() % 20) * 1000); if (waited != 0 && waited % 3600 == 0) { DL_ERROR(rc, DF_RB ": waited for memory for %d hour(s)", DP_RB_MPT(tls), waited / 3600); } } - waited += 10; + waited += 20; D_GOTO(retry, rc); } @@ -1825,6 +1823,7 @@ migrate_one_destroy(struct migrate_one *mrone) D_ASSERT(d_list_empty(&mrone->mo_list)); daos_iov_free(&mrone->mo_dkey); + daos_iov_free(&mrone->mo_csum_iov); if (mrone->mo_iods_update_ephs) { for (i = 0; i < mrone->mo_iod_alloc_num; i++) { @@ -1932,12 +1931,16 @@ migrate_res_hold(struct migrate_pool_tls *tls, int res_type, long units, bool *y *yielded = waited; /* per-pool counters for rebuild status tracking */ - if (res_type == MIGR_OBJ) + if (res_type == MIGR_OBJ) { tls->mpt_tgt_obj_ult_cnt++; - else if (res_type == MIGR_KEY) + } else if (res_type == MIGR_KEY) { tls->mpt_tgt_dkey_ult_cnt++; - else + } else { tls->mpt_inflight_size += units; + /* remaining resource may be sufficient for more waiters */ + if (waited && res->res_units < res->res_limit) + ABT_cond_signal(res->res_cond); + } D_DEBUG(DB_REBUILD, "res=%s, hold=%lu, used=%lu, limit=%lu, waited=%d)\n" DF_RB @@ -1995,8 +1998,9 @@ static void migrate_one_ult(void *arg) { struct migrate_one *mrone = arg; + struct iter_obj_arg *ioa = mrone->mo_obj_arg; struct migrate_pool_tls *tls; - daos_size_t data_size; + daos_size_t data_size; int rc = 0; while (daos_fail_check(DAOS_REBUILD_TGT_REBUILD_HANG)) @@ -2009,21 +2013,20 @@ migrate_one_ult(void *arg) } data_size = daos_iods_len(mrone->mo_iods, mrone->mo_iod_num); - data_size += daos_iods_len(mrone->mo_iods_from_parity, - mrone->mo_iods_num_from_parity); + data_size += daos_iods_len(mrone->mo_iods_from_parity, mrone->mo_iods_num_from_parity); D_DEBUG(DB_TRACE, "mrone %p data size is "DF_U64" %d/%d\n", mrone, data_size, mrone->mo_iod_num, mrone->mo_iods_num_from_parity); D_ASSERT(data_size != (daos_size_t)-1); - rc = migrate_res_hold(tls, MIGR_DATA, data_size, NULL); + rc = migrate_res_hold(tls, MIGR_DATA, data_size * ioa->ioa_fanout, NULL); if (rc) D_GOTO(out, rc); rc = migrate_dkey(tls, mrone, data_size); - migrate_res_release(tls, MIGR_DATA, data_size); + migrate_res_release(tls, MIGR_DATA, data_size * ioa->ioa_fanout); D_DEBUG(DB_REBUILD, DF_UOID" layout %u migrate dkey "DF_KEY" inflight_size "DF_U64": " DF_RC"\n", DP_UOID(mrone->mo_oid), mrone->mo_oid.id_layout_ver, @@ -2045,7 +2048,7 @@ migrate_one_ult(void *arg) tls->mpt_fini = 1; } out: - migrate_res_release(tls, MIGR_KEY, 1); + migrate_res_release(tls, MIGR_KEY, ioa->ioa_fanout); migrate_one_destroy(mrone); } @@ -2811,7 +2814,7 @@ migrate_start_ult(struct enum_unpack_arg *unpack_arg) continue; } - rc = migrate_res_hold(tls, MIGR_KEY, 1, NULL); + rc = migrate_res_hold(tls, MIGR_KEY, arg->ioa_fanout, NULL); if (rc) break; d_list_del_init(&mrone->mo_list); @@ -2825,7 +2828,7 @@ migrate_start_ult(struct enum_unpack_arg *unpack_arg) rc = dss_ult_create(migrate_one_ult, mrone, DSS_XS_SELF, 0, MIGRATE_STACK_SIZE, NULL); if (rc) { - migrate_res_release(tls, MIGR_KEY, 1); + migrate_res_release(tls, MIGR_KEY, arg->ioa_fanout); migrate_one_destroy(mrone); break; } @@ -3305,7 +3308,7 @@ migrate_obj_ult(void *data) tls->mpt_status = rc; free_notls: - migrate_res_release(tls, MIGR_OBJ, 1); + migrate_res_release(tls, MIGR_OBJ, arg->ioa_fanout); migrate_obj_put(arg); } @@ -3319,10 +3322,11 @@ struct migrate_obj_val { /* This is still running on the main migration ULT */ static int migrate_one_object(daos_unit_oid_t oid, daos_epoch_t eph, daos_epoch_t punched_eph, - unsigned int shard, unsigned int tgt_idx, void *data) + unsigned int shard, unsigned int tgt_idx, void *data, bool *yielded) { struct iter_cont_arg *cont_arg = data; struct iter_obj_arg *obj_arg; + struct daos_oclass_attr *oc; struct migrate_pool_tls *tls = cont_arg->pool_tls; daos_handle_t toh = tls->mpt_migrated_root_hdl; struct migrate_obj_val val; @@ -3348,6 +3352,17 @@ migrate_one_object(daos_unit_oid_t oid, daos_epoch_t eph, daos_epoch_t punched_e uuid_copy(obj_arg->cont_uuid, cont_arg->cont_uuid); obj_arg->version = cont_arg->pool_tls->mpt_version; obj_arg->generation = cont_arg->pool_tls->mpt_generation; + oc = daos_oclass_attr_find(oid.id_pub, NULL); + if (!oc) { + rc = -DER_NOSCHEMA; + DL_ERROR(rc, DF_RB " invalid object ID\n", DP_RB_MPT(tls)); + goto free; + } + if (daos_oclass_is_ec(oc)) + obj_arg->ioa_fanout = MIN(16, obj_ec_data_tgt_nr(oc)); + else + obj_arg->ioa_fanout = 1; + if (cont_arg->snaps) { D_ALLOC(obj_arg->snaps, sizeof(*cont_arg->snaps) * cont_arg->snap_cnt); @@ -3359,10 +3374,18 @@ migrate_one_object(daos_unit_oid_t oid, daos_epoch_t eph, daos_epoch_t punched_e sizeof(*obj_arg->snaps) * cont_arg->snap_cnt); } + rc = migrate_res_hold(tls, MIGR_OBJ, obj_arg->ioa_fanout, yielded); + if (rc != 0) { + DL_ERROR(rc, DF_UUID " enter migrate failed.", DP_UUID(tls->mpt_pool_uuid)); + goto free; + } + D_ASSERT(tgt_idx == dss_get_module_info()->dmi_tgt_id); rc = dss_ult_create(migrate_obj_ult, obj_arg, DSS_XS_SELF, 0, MIGRATE_STACK_SIZE, NULL); - if (rc) + if (rc) { + migrate_res_release(tls, MIGR_OBJ, obj_arg->ioa_fanout); goto free; + } val.epoch = eph; val.shard = shard; @@ -3405,17 +3428,9 @@ migrate_obj_iter_cb(daos_handle_t ih, d_iov_t *key_iov, d_iov_t *val_iov, void * " eph "DF_U64" start\n", DP_UUID(arg->cont_uuid), DP_UOID(*oid), ih.cookie, epoch); - rc = migrate_res_hold(arg->pool_tls, MIGR_OBJ, 1, &yielded); - if (rc != 0) { - DL_ERROR(rc, DF_UUID" enter migrate failed.", DP_UUID(arg->cont_uuid)); - return rc; - } - - rc = migrate_one_object(*oid, epoch, punched_epoch, shard, tgt_idx, arg); + rc = migrate_one_object(*oid, epoch, punched_epoch, shard, tgt_idx, arg, &yielded); if (rc != 0) { - D_ERROR("obj "DF_UOID" migration failed: "DF_RC"\n", - DP_UOID(*oid), DP_RC(rc)); - migrate_res_release(arg->pool_tls, MIGR_OBJ, 1); + D_ERROR("obj " DF_UOID " migration failed: " DF_RC "\n", DP_UOID(*oid), DP_RC(rc)); return rc; } @@ -4148,7 +4163,7 @@ migr_res_fini(struct migr_resource *res) int obj_migrate_init(void) { - unsigned int ults = MIGR_TGT_ULTS_DEF; + unsigned int ults = MIGR_TGT_OBJS_DEF; int i; int rc = 0; @@ -4156,10 +4171,10 @@ obj_migrate_init(void) D_CASSERT(MIGR_TGT_INF_DATA > MIGR_INF_DATA_HULK); d_getenv_uint(ENV_MIGRATE_ULT_CNT, &ults); - if (ults < MIGR_TGT_ULTS_MIN) - ults = MIGR_TGT_ULTS_MIN; - if (ults > MIGR_TGT_ULTS_MAX) - ults = MIGR_TGT_ULTS_MAX; + if (ults < MIGR_TGT_OBJS_MIN) + ults = MIGR_TGT_OBJS_MIN; + if (ults > MIGR_TGT_OBJS_MAX) + ults = MIGR_TGT_OBJS_MAX; memset(&migr_eng_res, 0, sizeof(migr_eng_res)); migr_eng_res.er_max_ults = ults;