diff options
Diffstat (limited to 'xlators/cluster/ec/src/ec.c')
| -rw-r--r-- | xlators/cluster/ec/src/ec.c | 213 |
1 files changed, 197 insertions, 16 deletions
diff --git a/xlators/cluster/ec/src/ec.c b/xlators/cluster/ec/src/ec.c index 3c8013e8297..7344be4968d 100644 --- a/xlators/cluster/ec/src/ec.c +++ b/xlators/cluster/ec/src/ec.c @@ -285,6 +285,7 @@ reconfigure(xlator_t *this, dict_t *options) GF_OPTION_RECONF("parallel-writes", ec->parallel_writes, options, bool, failed); GF_OPTION_RECONF("stripe-cache", ec->stripe_cache, options, uint32, failed); + GF_OPTION_RECONF("quorum-count", ec->quorum_count, options, uint32, failed); ret = 0; if (ec_assign_read_policy(ec, read_policy)) { ret = -1; @@ -324,13 +325,18 @@ ec_get_event_from_state(ec_t *ec) void ec_up(xlator_t *this, ec_t *ec) { + char str1[32], str2[32]; + if (ec->timer != NULL) { gf_timer_call_cancel(this->ctx, ec->timer); ec->timer = NULL; } ec->up = 1; - gf_msg(this->name, GF_LOG_INFO, 0, EC_MSG_EC_UP, "Going UP"); + gf_msg(this->name, GF_LOG_INFO, 0, EC_MSG_EC_UP, + "Going UP : Child UP = %s Child Notify = %s", + ec_bin(str1, sizeof(str1), ec->xl_up, ec->nodes), + ec_bin(str2, sizeof(str2), ec->xl_notify, ec->nodes)); gf_event(EVENT_EC_MIN_BRICKS_UP, "subvol=%s", this->name); } @@ -338,13 +344,18 @@ ec_up(xlator_t *this, ec_t *ec) void ec_down(xlator_t *this, ec_t *ec) { + char str1[32], str2[32]; + if (ec->timer != NULL) { gf_timer_call_cancel(this->ctx, ec->timer); ec->timer = NULL; } ec->up = 0; - gf_msg(this->name, GF_LOG_INFO, 0, EC_MSG_EC_DOWN, "Going DOWN"); + gf_msg(this->name, GF_LOG_INFO, 0, EC_MSG_EC_DOWN, + "Going DOWN : Child UP = %s Child Notify = %s", + ec_bin(str1, sizeof(str1), ec->xl_up, ec->nodes), + ec_bin(str2, sizeof(str2), ec->xl_notify, ec->nodes)); gf_event(EVENT_EC_MIN_BRICKS_NOT_UP, "subvol=%s", this->name); } @@ -355,6 +366,7 @@ ec_notify_cbk(void *data) ec_t *ec = data; glusterfs_event_t event = GF_EVENT_MAXVAL; gf_boolean_t propagate = _gf_false; + gf_boolean_t launch_heal = _gf_false; LOCK(&ec->lock); { @@ -384,6 +396,11 @@ ec_notify_cbk(void *data) * still bricks DOWN, they will be healed when they * come up. */ ec_up(ec->xl, ec); + + if (ec->shd.iamshd && !ec->shutdown) { + launch_heal = _gf_true; + GF_ATOMIC_INC(ec->async_fop_count); + } } propagate = _gf_true; @@ -391,13 +408,12 @@ ec_notify_cbk(void *data) unlock: UNLOCK(&ec->lock); + if (launch_heal) { + /* We have just brought the volume UP, so we trigger + * a self-heal check on the root directory. */ + ec_launch_replace_heal(ec); + } if (propagate) { - if ((event == GF_EVENT_CHILD_UP) && ec->shd.iamshd) { - /* We have just brought the volume UP, so we trigger - * a self-heal check on the root directory. */ - ec_launch_replace_heal(ec); - } - default_notify(ec->xl, event, NULL); } } @@ -425,10 +441,55 @@ ec_disable_delays(ec_t *ec) { ec->shutdown = _gf_true; - return list_empty(&ec->pending_fops); + return __ec_is_last_fop(ec); } void +ec_cleanup_healer_object(ec_t *ec) +{ + struct subvol_healer *healer = NULL; + ec_self_heald_t *shd = NULL; + void *res = NULL; + int i = 0; + gf_boolean_t is_join = _gf_false; + + shd = &ec->shd; + if (!shd->iamshd) + return; + + for (i = 0; i < ec->nodes; i++) { + healer = &shd->index_healers[i]; + pthread_mutex_lock(&healer->mutex); + { + healer->rerun = 1; + if (healer->running) { + pthread_cond_signal(&healer->cond); + is_join = _gf_true; + } + } + pthread_mutex_unlock(&healer->mutex); + if (is_join) { + pthread_join(healer->thread, &res); + is_join = _gf_false; + } + + healer = &shd->full_healers[i]; + pthread_mutex_lock(&healer->mutex); + { + healer->rerun = 1; + if (healer->running) { + pthread_cond_signal(&healer->cond); + is_join = _gf_true; + } + } + pthread_mutex_unlock(&healer->mutex); + if (is_join) { + pthread_join(healer->thread, &res); + is_join = _gf_false; + } + } +} +void ec_pending_fops_completed(ec_t *ec) { if (ec->shutdown) { @@ -441,6 +502,9 @@ ec_set_up_state(ec_t *ec, uintptr_t index_mask, uintptr_t new_state) { uintptr_t current_state = 0; + if (xlator_is_cleanup_starting(ec->xl)) + return _gf_false; + if ((ec->xl_notify & index_mask) == 0) { ec->xl_notify |= index_mask; ec->xl_notify_count++; @@ -462,6 +526,7 @@ ec_upcall(ec_t *ec, struct gf_upcall *upcall) struct gf_upcall_cache_invalidation *ci = NULL; struct gf_upcall_inodelk_contention *lc = NULL; inode_t *inode; + inode_table_t *table; switch (upcall->event_type) { case GF_UPCALL_CACHE_INVALIDATION: @@ -475,8 +540,18 @@ ec_upcall(ec_t *ec, struct gf_upcall *upcall) /* The lock is not owned by EC, ignore it. */ return _gf_true; } - inode = inode_find(((xlator_t *)ec->xl->graph->top)->itable, - upcall->gfid); + table = ((xlator_t *)ec->xl->graph->top)->itable; + if (table == NULL) { + /* Self-heal daemon doesn't have an inode table on the top + * xlator because it doesn't need it. In this case we should + * use the inode table managed by EC itself where all inodes + * being healed should be present. However self-heal doesn't + * use eager-locking and inodelk's are already released as + * soon as possible. In this case we can safely ignore these + * notifications. */ + return _gf_false; + } + inode = inode_find(table, upcall->gfid); /* If inode is not found, it means that it's already released, * so we can ignore it. Probably it has been released and * destroyed while the contention notification was being sent. @@ -544,6 +619,7 @@ ec_notify(xlator_t *this, int32_t event, void *data, void *data2) /* If there aren't pending fops running after we have waken up * them, we immediately propagate the notification. */ propagate = ec_disable_delays(ec); + ec_cleanup_healer_object(ec); goto unlock; } @@ -554,7 +630,10 @@ ec_notify(xlator_t *this, int32_t event, void *data, void *data2) if (event == GF_EVENT_CHILD_UP) { /* We need to trigger a selfheal if a brick changes * to UP state. */ - needs_shd_check = ec_set_up_state(ec, mask, mask); + if (ec_set_up_state(ec, mask, mask) && ec->shd.iamshd && + !ec->shutdown) { + needs_shd_check = _gf_true; + } } else if (event == GF_EVENT_CHILD_DOWN) { ec_set_up_state(ec, mask, 0); } @@ -584,17 +663,21 @@ ec_notify(xlator_t *this, int32_t event, void *data, void *data2) } } else { propagate = _gf_false; + needs_shd_check = _gf_false; + } + + if (needs_shd_check) { + GF_ATOMIC_INC(ec->async_fop_count); } } unlock: UNLOCK(&ec->lock); done: + if (needs_shd_check) { + ec_launch_replace_heal(ec); + } if (propagate) { - if (needs_shd_check && ec->shd.iamshd) { - ec_launch_replace_heal(ec); - } - error = default_notify(this, event, data); } @@ -627,6 +710,69 @@ ec_statistics_init(ec_t *ec) GF_ATOMIC_INIT(ec->stats.stripe_cache.evicts, 0); GF_ATOMIC_INIT(ec->stats.stripe_cache.allocs, 0); GF_ATOMIC_INIT(ec->stats.stripe_cache.errors, 0); + GF_ATOMIC_INIT(ec->stats.shd.attempted, 0); + GF_ATOMIC_INIT(ec->stats.shd.completed, 0); +} + +static int +ec_assign_read_mask(ec_t *ec, char *read_mask_str) +{ + char *mask = NULL; + char *maskptr = NULL; + char *saveptr = NULL; + char *id_str = NULL; + int id = 0; + int ret = 0; + uintptr_t read_mask = 0; + + if (!read_mask_str) { + ec->read_mask = 0; + ret = 0; + goto out; + } + + mask = gf_strdup(read_mask_str); + if (!mask) { + ret = -1; + goto out; + } + maskptr = mask; + + for (;;) { + id_str = strtok_r(maskptr, ":", &saveptr); + if (id_str == NULL) + break; + if (gf_string2int(id_str, &id)) { + gf_msg(ec->xl->name, GF_LOG_ERROR, 0, EC_MSG_XLATOR_INIT_FAIL, + "In read-mask \"%s\" id %s is not a valid integer", + read_mask_str, id_str); + ret = -1; + goto out; + } + + if ((id < 0) || (id >= ec->nodes)) { + gf_msg(ec->xl->name, GF_LOG_ERROR, 0, EC_MSG_XLATOR_INIT_FAIL, + "In read-mask \"%s\" id %d is not in range [0 - %d]", + read_mask_str, id, ec->nodes - 1); + ret = -1; + goto out; + } + read_mask |= (1UL << id); + maskptr = NULL; + } + + if (gf_bits_count(read_mask) < ec->fragments) { + gf_msg(ec->xl->name, GF_LOG_ERROR, 0, EC_MSG_XLATOR_INIT_FAIL, + "read-mask \"%s\" should contain at least %d ids", read_mask_str, + ec->fragments); + ret = -1; + goto out; + } + ec->read_mask = read_mask; + ret = 0; +out: + GF_FREE(mask); + return ret; } int32_t @@ -636,6 +782,7 @@ init(xlator_t *this) char *read_policy = NULL; char *extensions = NULL; int32_t err; + char *read_mask_str = NULL; if (this->parents == NULL) { gf_msg(this->name, GF_LOG_WARNING, 0, EC_MSG_NO_PARENTS, @@ -656,6 +803,7 @@ init(xlator_t *this) ec->xl = this; LOCK_INIT(&ec->lock); + GF_ATOMIC_INIT(ec->async_fop_count, 0); INIT_LIST_HEAD(&ec->pending_fops); INIT_LIST_HEAD(&ec->heal_waiting); INIT_LIST_HEAD(&ec->healing); @@ -714,12 +862,18 @@ init(xlator_t *this) if (ec_assign_read_policy(ec, read_policy)) goto failed; + GF_OPTION_INIT("heal-timeout", ec->shd.timeout, int32, failed); GF_OPTION_INIT("shd-max-threads", ec->shd.max_threads, uint32, failed); GF_OPTION_INIT("shd-wait-qlength", ec->shd.wait_qlength, uint32, failed); GF_OPTION_INIT("optimistic-change-log", ec->optimistic_changelog, bool, failed); GF_OPTION_INIT("parallel-writes", ec->parallel_writes, bool, failed); GF_OPTION_INIT("stripe-cache", ec->stripe_cache, uint32, failed); + GF_OPTION_INIT("quorum-count", ec->quorum_count, uint32, failed); + GF_OPTION_INIT("ec-read-mask", read_mask_str, str, failed); + + if (ec_assign_read_mask(ec, read_mask_str)) + goto failed; this->itable = inode_table_new(EC_SHD_INODE_LRU_LIMIT, this); if (!this->itable) @@ -759,6 +913,7 @@ failed: void fini(xlator_t *this) { + ec_selfheal_daemon_fini(this); __ec_destroy_private(this); } @@ -1394,6 +1549,10 @@ ec_dump_private(xlator_t *this) gf_proc_dump_write("childs_up", "%u", ec->xl_up_count); gf_proc_dump_write("childs_up_mask", "%s", ec_bin(tmp, sizeof(tmp), ec->xl_up, ec->nodes)); + if (ec->read_mask) { + gf_proc_dump_write("read-mask", "%s", + ec_bin(tmp, sizeof(tmp), ec->read_mask, ec->nodes)); + } gf_proc_dump_write("background-heals", "%d", ec->background_heals); gf_proc_dump_write("heal-wait-qlength", "%d", ec->heal_wait_qlen); gf_proc_dump_write("self-heal-window-size", "%" PRIu32, @@ -1402,6 +1561,7 @@ ec_dump_private(xlator_t *this) gf_proc_dump_write("heal-waiters", "%d", ec->heal_waiters); gf_proc_dump_write("read-policy", "%s", ec_read_policies[ec->read_policy]); gf_proc_dump_write("parallel-writes", "%d", ec->parallel_writes); + gf_proc_dump_write("quorum-count", "%u", ec->quorum_count); snprintf(key_prefix, GF_DUMP_MAX_BUF_LEN, "%s.%s.stats.stripe_cache", this->type, this->name); @@ -1421,6 +1581,10 @@ ec_dump_private(xlator_t *this) GF_ATOMIC_GET(ec->stats.stripe_cache.allocs)); gf_proc_dump_write("errors", "%" GF_PRI_ATOMIC, GF_ATOMIC_GET(ec->stats.stripe_cache.errors)); + gf_proc_dump_write("heals-attempted", "%" GF_PRI_ATOMIC, + GF_ATOMIC_GET(ec->stats.shd.attempted)); + gf_proc_dump_write("heals-completed", "%" GF_PRI_ATOMIC, + GF_ATOMIC_GET(ec->stats.shd.completed)); return 0; } @@ -1672,6 +1836,23 @@ struct volume_options options[] = { "lead to extra memory consumption, maximum " "(cache size * stripe size) Bytes per open file."}, { + .key = {"quorum-count"}, + .type = GF_OPTION_TYPE_INT, + .default_value = "0", + .description = + "This option can be used to define how many successes on" + "the bricks constitute a success to the application. This" + " count should be in the range" + "[disperse-data-count, disperse-count] (inclusive)", + }, + { + .key = {"ec-read-mask"}, + .type = GF_OPTION_TYPE_STR, + .default_value = NULL, + .description = "This option can be used to choose which bricks can be" + " used for reading data/metadata of a file/directory", + }, + { .key = {NULL}, }, }; |
