summaryrefslogtreecommitdiffstats
path: root/xlators/mgmt/glusterd/src/glusterd-syncop.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/mgmt/glusterd/src/glusterd-syncop.c')
-rw-r--r--xlators/mgmt/glusterd/src/glusterd-syncop.c829
1 files changed, 714 insertions, 115 deletions
diff --git a/xlators/mgmt/glusterd/src/glusterd-syncop.c b/xlators/mgmt/glusterd/src/glusterd-syncop.c
index aa1391df8..b36d6f616 100644
--- a/xlators/mgmt/glusterd/src/glusterd-syncop.c
+++ b/xlators/mgmt/glusterd/src/glusterd-syncop.c
@@ -17,8 +17,11 @@
#include "glusterd.h"
#include "glusterd-op-sm.h"
#include "glusterd-utils.h"
+#include "glusterd-locks.h"
-static inline void
+extern glusterd_op_info_t opinfo;
+
+void
gd_synctask_barrier_wait (struct syncargs *args, int count)
{
glusterd_conf_t *conf = THIS->private;
@@ -31,18 +34,137 @@ gd_synctask_barrier_wait (struct syncargs *args, int count)
}
static void
-gd_collate_errors (struct syncargs *args, int op_ret, int op_errno,
- char *op_errstr)
+gd_mgmt_v3_collate_errors (struct syncargs *args, int op_ret, int op_errno,
+ char *op_errstr, int op_code,
+ glusterd_peerinfo_t *peerinfo, u_char *uuid)
{
- if (args->op_ret)
- return;
- args->op_ret = op_ret;
- args->op_errno = op_errno;
- if (op_ret && op_errstr && strcmp (op_errstr, ""))
- args->errstr = gf_strdup (op_errstr);
+ char err_str[PATH_MAX] = "Please check log file for details.";
+ char op_err[PATH_MAX] = "";
+ char *peer_str = NULL;
+
+ if (op_ret) {
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+
+ if (peerinfo)
+ peer_str = peerinfo->hostname;
+ else
+ peer_str = uuid_utoa (uuid);
+
+ if (op_errstr && strcmp (op_errstr, ""))
+ snprintf (err_str, sizeof(err_str) - 1,
+ "Error: %s", op_errstr);
+
+ switch (op_code) {
+ case GLUSTERD_MGMT_V3_LOCK:
+ {
+ snprintf (op_err, sizeof(op_err) - 1,
+ "Locking failed "
+ "on %s. %s", peer_str, err_str);
+ break;
+ }
+ case GLUSTERD_MGMT_V3_UNLOCK:
+ {
+ snprintf (op_err, sizeof(op_err) - 1,
+ "Unlocking failed "
+ "on %s. %s", peer_str, err_str);
+ break;
+ }
+ }
+
+ if (args->errstr) {
+ snprintf (err_str, sizeof(err_str) - 1,
+ "%s\n%s", args->errstr,
+ op_err);
+ GF_FREE (args->errstr);
+ args->errstr = NULL;
+ } else
+ snprintf (err_str, sizeof(err_str) - 1,
+ "%s", op_err);
+
+ gf_log ("", GF_LOG_ERROR, "%s", op_err);
+ args->errstr = gf_strdup (err_str);
+ }
+
+ return;
}
static void
+gd_collate_errors (struct syncargs *args, int op_ret, int op_errno,
+ char *op_errstr, int op_code,
+ glusterd_peerinfo_t *peerinfo, u_char *uuid)
+{
+ char err_str[PATH_MAX] = "Please check log file for details.";
+ char op_err[PATH_MAX] = "";
+ int len = -1;
+ char *peer_str = NULL;
+
+ if (op_ret) {
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+
+ if (peerinfo)
+ peer_str = peerinfo->hostname;
+ else
+ peer_str = uuid_utoa (uuid);
+
+ if (op_errstr && strcmp (op_errstr, "")) {
+ len = snprintf (err_str, sizeof(err_str) - 1,
+ "Error: %s", op_errstr);
+ err_str[len] = '\0';
+ }
+
+ switch (op_code){
+ case GLUSTERD_MGMT_CLUSTER_LOCK :
+ {
+ len = snprintf (op_err, sizeof(op_err) - 1,
+ "Locking failed on %s. %s",
+ peer_str, err_str);
+ break;
+ }
+ case GLUSTERD_MGMT_CLUSTER_UNLOCK :
+ {
+ len = snprintf (op_err, sizeof(op_err) - 1,
+ "Unlocking failed on %s. %s",
+ peer_str, err_str);
+ break;
+ }
+ case GLUSTERD_MGMT_STAGE_OP :
+ {
+ len = snprintf (op_err, sizeof(op_err) - 1,
+ "Staging failed on %s. %s",
+ peer_str, err_str);
+ break;
+ }
+ case GLUSTERD_MGMT_COMMIT_OP :
+ {
+ len = snprintf (op_err, sizeof(op_err) - 1,
+ "Commit failed on %s. %s",
+ peer_str, err_str);
+ break;
+ }
+ }
+ op_err[len] = '\0';
+
+ if (args->errstr) {
+ len = snprintf (err_str, sizeof(err_str) - 1,
+ "%s\n%s", args->errstr,
+ op_err);
+ GF_FREE (args->errstr);
+ args->errstr = NULL;
+ } else
+ len = snprintf (err_str, sizeof(err_str) - 1,
+ "%s", op_err);
+ err_str[len] = '\0';
+
+ gf_log ("", GF_LOG_ERROR, "%s", op_err);
+ args->errstr = gf_strdup (err_str);
+ }
+
+ return;
+}
+
+void
gd_syncargs_init (struct syncargs *args, dict_t *op_ctx)
{
args->dict = op_ctx;
@@ -82,9 +204,9 @@ gd_brick_op_req_free (gd1_mgmt_brick_op_req *req)
}
int
-gd_syncop_submit_request (struct rpc_clnt *rpc, void *req,
- void *cookie, rpc_clnt_prog_t *prog,
- int procnum, fop_cbk_fn_t cbkfn, xdrproc_t xdrproc)
+gd_syncop_submit_request (struct rpc_clnt *rpc, void *req, void *local,
+ void *cookie, rpc_clnt_prog_t *prog, int procnum,
+ fop_cbk_fn_t cbkfn, xdrproc_t xdrproc)
{
int ret = -1;
struct iobuf *iobuf = NULL;
@@ -124,7 +246,8 @@ gd_syncop_submit_request (struct rpc_clnt *rpc, void *req,
iov.iov_len = ret;
count = 1;
- frame->local = cookie;
+ frame->local = local;
+ frame->cookie = cookie;
/* Send the msg */
ret = rpc_clnt_submit (rpc, prog, procnum, cbkfn,
@@ -143,8 +266,9 @@ out:
/* Defined in glusterd-rpc-ops.c */
extern struct rpc_clnt_program gd_mgmt_prog;
extern struct rpc_clnt_program gd_brick_prog;
+extern struct rpc_clnt_program gd_mgmt_v3_prog;
-static int
+int
glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp)
{
int ret = 0;
@@ -168,6 +292,9 @@ glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp)
goto out;
break;
+ case GD_OP_GSYNC_CREATE:
+ break;
+
case GD_OP_GSYNC_SET:
ret = glusterd_gsync_use_rsp_dict (aggr, rsp, NULL);
if (ret)
@@ -194,12 +321,28 @@ glusterd_syncop_aggr_rsp_dict (glusterd_op_t op, dict_t *aggr, dict_t *rsp)
break;
- case GD_OP_QUOTA:
case GD_OP_CLEARLOCKS_VOLUME:
ret = glusterd_use_rsp_dict (aggr, rsp);
if (ret)
goto out;
+ break;
+
+ case GD_OP_QUOTA:
+ ret = glusterd_volume_quota_copy_to_op_ctx_dict (aggr, rsp);
+ if (ret)
+ goto out;
+ break;
+ case GD_OP_SYS_EXEC:
+ ret = glusterd_sys_exec_output_rsp_dict (aggr, rsp);
+ if (ret)
+ goto out;
+ break;
+
+ case GD_OP_SNAP:
+ ret = glusterd_snap_use_rsp_dict (aggr, rsp);
+ if (ret)
+ goto out;
break;
default:
@@ -210,20 +353,204 @@ out:
}
int32_t
+gd_syncop_mgmt_v3_lock_cbk_fn (struct rpc_req *req, struct iovec *iov,
+ int count, void *myframe)
+{
+ int ret = -1;
+ struct syncargs *args = NULL;
+ glusterd_peerinfo_t *peerinfo = NULL;
+ gd1_mgmt_v3_lock_rsp rsp = {{0},};
+ call_frame_t *frame = NULL;
+ int op_ret = -1;
+ int op_errno = -1;
+
+ GF_ASSERT(req);
+ GF_ASSERT(iov);
+ GF_ASSERT(myframe);
+
+ frame = myframe;
+ args = frame->local;
+ peerinfo = frame->cookie;
+ frame->local = NULL;
+ frame->cookie = NULL;
+
+ if (-1 == req->rpc_status) {
+ op_errno = ENOTCONN;
+ goto out;
+ }
+
+ ret = xdr_to_generic (*iov, &rsp,
+ (xdrproc_t)xdr_gd1_mgmt_v3_lock_rsp);
+ if (ret < 0)
+ goto out;
+
+ uuid_copy (args->uuid, rsp.uuid);
+
+ op_ret = rsp.op_ret;
+ op_errno = rsp.op_errno;
+out:
+ gd_mgmt_v3_collate_errors (args, op_ret, op_errno, NULL,
+ GLUSTERD_MGMT_V3_LOCK,
+ peerinfo, rsp.uuid);
+ STACK_DESTROY (frame->root);
+ synctask_barrier_wake(args);
+ return 0;
+}
+
+int32_t
+gd_syncop_mgmt_v3_lock_cbk (struct rpc_req *req, struct iovec *iov,
+ int count, void *myframe)
+{
+ return glusterd_big_locked_cbk (req, iov, count, myframe,
+ gd_syncop_mgmt_v3_lock_cbk_fn);
+}
+
+int
+gd_syncop_mgmt_v3_lock (glusterd_op_t op, dict_t *op_ctx,
+ glusterd_peerinfo_t *peerinfo,
+ struct syncargs *args, uuid_t my_uuid,
+ uuid_t recv_uuid, uuid_t txn_id)
+{
+ int ret = -1;
+ gd1_mgmt_v3_lock_req req = {{0},};
+ glusterd_conf_t *conf = THIS->private;
+
+ GF_ASSERT(op_ctx);
+ GF_ASSERT(peerinfo);
+ GF_ASSERT(args);
+
+ ret = dict_allocate_and_serialize (op_ctx,
+ &req.dict.dict_val,
+ &req.dict.dict_len);
+ if (ret)
+ goto out;
+
+ uuid_copy (req.uuid, my_uuid);
+ uuid_copy (req.txn_id, txn_id);
+ req.op = op;
+ synclock_unlock (&conf->big_lock);
+ ret = gd_syncop_submit_request (peerinfo->rpc, &req, args, peerinfo,
+ &gd_mgmt_v3_prog,
+ GLUSTERD_MGMT_V3_LOCK,
+ gd_syncop_mgmt_v3_lock_cbk,
+ (xdrproc_t)
+ xdr_gd1_mgmt_v3_lock_req);
+ synclock_lock (&conf->big_lock);
+out:
+ gf_log ("", GF_LOG_DEBUG, "Returning %d", ret);
+ return ret;
+}
+
+int32_t
+gd_syncop_mgmt_v3_unlock_cbk_fn (struct rpc_req *req, struct iovec *iov,
+ int count, void *myframe)
+{
+ int ret = -1;
+ struct syncargs *args = NULL;
+ glusterd_peerinfo_t *peerinfo = NULL;
+ gd1_mgmt_v3_unlock_rsp rsp = {{0},};
+ call_frame_t *frame = NULL;
+ int op_ret = -1;
+ int op_errno = -1;
+
+ GF_ASSERT(req);
+ GF_ASSERT(iov);
+ GF_ASSERT(myframe);
+
+ frame = myframe;
+ args = frame->local;
+ peerinfo = frame->cookie;
+ frame->local = NULL;
+ frame->cookie = NULL;
+
+ if (-1 == req->rpc_status) {
+ op_errno = ENOTCONN;
+ goto out;
+ }
+
+ ret = xdr_to_generic (*iov, &rsp,
+ (xdrproc_t)xdr_gd1_mgmt_v3_unlock_rsp);
+ if (ret < 0)
+ goto out;
+
+ uuid_copy (args->uuid, rsp.uuid);
+
+ /* Set peer as locked, so we unlock only the locked peers */
+ if (rsp.op_ret == 0)
+ peerinfo->locked = _gf_true;
+ op_ret = rsp.op_ret;
+ op_errno = rsp.op_errno;
+out:
+ gd_mgmt_v3_collate_errors (args, op_ret, op_errno, NULL,
+ GLUSTERD_MGMT_V3_UNLOCK,
+ peerinfo, rsp.uuid);
+ STACK_DESTROY (frame->root);
+ synctask_barrier_wake(args);
+ return 0;
+}
+
+int32_t
+gd_syncop_mgmt_v3_unlock_cbk (struct rpc_req *req, struct iovec *iov,
+ int count, void *myframe)
+{
+ return glusterd_big_locked_cbk (req, iov, count, myframe,
+ gd_syncop_mgmt_v3_unlock_cbk_fn);
+}
+
+int
+gd_syncop_mgmt_v3_unlock (dict_t *op_ctx, glusterd_peerinfo_t *peerinfo,
+ struct syncargs *args, uuid_t my_uuid,
+ uuid_t recv_uuid, uuid_t txn_id)
+{
+ int ret = -1;
+ gd1_mgmt_v3_unlock_req req = {{0},};
+ glusterd_conf_t *conf = THIS->private;
+
+ GF_ASSERT(op_ctx);
+ GF_ASSERT(peerinfo);
+ GF_ASSERT(args);
+
+ ret = dict_allocate_and_serialize (op_ctx,
+ &req.dict.dict_val,
+ &req.dict.dict_len);
+ if (ret)
+ goto out;
+
+ uuid_copy (req.uuid, my_uuid);
+ uuid_copy (req.txn_id, txn_id);
+ synclock_unlock (&conf->big_lock);
+ ret = gd_syncop_submit_request (peerinfo->rpc, &req, args, peerinfo,
+ &gd_mgmt_v3_prog,
+ GLUSTERD_MGMT_V3_UNLOCK,
+ gd_syncop_mgmt_v3_unlock_cbk,
+ (xdrproc_t)
+ xdr_gd1_mgmt_v3_unlock_req);
+ synclock_lock (&conf->big_lock);
+out:
+ gf_log ("", GF_LOG_DEBUG, "Returning %d", ret);
+ return ret;
+}
+
+int32_t
_gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov,
int count, void *myframe)
{
int ret = -1;
struct syncargs *args = NULL;
+ glusterd_peerinfo_t *peerinfo = NULL;
gd1_mgmt_cluster_lock_rsp rsp = {{0},};
call_frame_t *frame = NULL;
+ int op_ret = -1;
+ int op_errno = -1;
frame = myframe;
args = frame->local;
+ peerinfo = frame->cookie;
frame->local = NULL;
+ frame->cookie = NULL;
if (-1 == req->rpc_status) {
- args->op_errno = ENOTCONN;
+ op_errno = ENOTCONN;
goto out;
}
@@ -232,25 +559,31 @@ _gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov,
if (ret < 0)
goto out;
- gd_collate_errors (args, rsp.op_ret, rsp.op_errno, NULL);
uuid_copy (args->uuid, rsp.uuid);
+ /* Set peer as locked, so we unlock only the locked peers */
+ if (rsp.op_ret == 0)
+ peerinfo->locked = _gf_true;
+ op_ret = rsp.op_ret;
+ op_errno = rsp.op_errno;
out:
+ gd_collate_errors (args, op_ret, op_errno, NULL,
+ GLUSTERD_MGMT_CLUSTER_LOCK, peerinfo, rsp.uuid);
STACK_DESTROY (frame->root);
synctask_barrier_wake(args);
return 0;
}
int32_t
-gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov,
- int count, void *myframe)
+gd_syncop_mgmt_lock_cbk (struct rpc_req *req, struct iovec *iov, int count,
+ void *myframe)
{
return glusterd_big_locked_cbk (req, iov, count, myframe,
_gd_syncop_mgmt_lock_cbk);
}
int
-gd_syncop_mgmt_lock (struct rpc_clnt *rpc, struct syncargs *args,
+gd_syncop_mgmt_lock (glusterd_peerinfo_t *peerinfo, struct syncargs *args,
uuid_t my_uuid, uuid_t recv_uuid)
{
int ret = -1;
@@ -259,7 +592,8 @@ gd_syncop_mgmt_lock (struct rpc_clnt *rpc, struct syncargs *args,
uuid_copy (req.uuid, my_uuid);
synclock_unlock (&conf->big_lock);
- ret = gd_syncop_submit_request (rpc, &req, args, &gd_mgmt_prog,
+ ret = gd_syncop_submit_request (peerinfo->rpc, &req, args, peerinfo,
+ &gd_mgmt_prog,
GLUSTERD_MGMT_CLUSTER_LOCK,
gd_syncop_mgmt_lock_cbk,
(xdrproc_t) xdr_gd1_mgmt_cluster_lock_req);
@@ -273,15 +607,19 @@ _gd_syncop_mgmt_unlock_cbk (struct rpc_req *req, struct iovec *iov,
{
int ret = -1;
struct syncargs *args = NULL;
+ glusterd_peerinfo_t *peerinfo = NULL;
gd1_mgmt_cluster_unlock_rsp rsp = {{0},};
call_frame_t *frame = NULL;
+ int op_ret = -1;
+ int op_errno = -1;
frame = myframe;
args = frame->local;
+ peerinfo = frame->cookie;
frame->local = NULL;
if (-1 == req->rpc_status) {
- args->op_errno = ENOTCONN;
+ op_errno = ENOTCONN;
goto out;
}
@@ -290,10 +628,14 @@ _gd_syncop_mgmt_unlock_cbk (struct rpc_req *req, struct iovec *iov,
if (ret < 0)
goto out;
- gd_collate_errors (args, rsp.op_ret, rsp.op_errno, NULL);
uuid_copy (args->uuid, rsp.uuid);
+ peerinfo->locked = _gf_false;
+ op_ret = rsp.op_ret;
+ op_errno = rsp.op_errno;
out:
+ gd_collate_errors (args, op_ret, op_errno, NULL,
+ GLUSTERD_MGMT_CLUSTER_UNLOCK, peerinfo, rsp.uuid);
STACK_DESTROY (frame->root);
synctask_barrier_wake(args);
return 0;
@@ -309,7 +651,7 @@ gd_syncop_mgmt_unlock_cbk (struct rpc_req *req, struct iovec *iov,
int
-gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, struct syncargs *args,
+gd_syncop_mgmt_unlock (glusterd_peerinfo_t *peerinfo, struct syncargs *args,
uuid_t my_uuid, uuid_t recv_uuid)
{
int ret = -1;
@@ -318,7 +660,8 @@ gd_syncop_mgmt_unlock (struct rpc_clnt *rpc, struct syncargs *args,
uuid_copy (req.uuid, my_uuid);
synclock_unlock (&conf->big_lock);
- ret = gd_syncop_submit_request (rpc, &req, args, &gd_mgmt_prog,
+ ret = gd_syncop_submit_request (peerinfo->rpc, &req, args, peerinfo,
+ &gd_mgmt_prog,
GLUSTERD_MGMT_CLUSTER_UNLOCK,
gd_syncop_mgmt_unlock_cbk,
(xdrproc_t) xdr_gd1_mgmt_cluster_lock_req);
@@ -336,6 +679,9 @@ _gd_syncop_stage_op_cbk (struct rpc_req *req, struct iovec *iov,
xlator_t *this = NULL;
dict_t *rsp_dict = NULL;
call_frame_t *frame = NULL;
+ glusterd_peerinfo_t *peerinfo = NULL;
+ int op_ret = -1;
+ int op_errno = -1;
this = THIS;
frame = myframe;
@@ -343,8 +689,7 @@ _gd_syncop_stage_op_cbk (struct rpc_req *req, struct iovec *iov,
frame->local = NULL;
if (-1 == req->rpc_status) {
- args->op_ret = -1;
- args->op_errno = ENOTCONN;
+ op_errno = ENOTCONN;
goto out;
}
@@ -368,9 +713,17 @@ _gd_syncop_stage_op_cbk (struct rpc_req *req, struct iovec *iov,
}
}
- gd_collate_errors (args, rsp.op_ret, rsp.op_errno, rsp.op_errstr);
+ ret = glusterd_friend_find (rsp.uuid, NULL, &peerinfo);
+ if (ret) {
+ gf_log (this->name, GF_LOG_CRITICAL, "Staging response "
+ "for 'Volume %s' received from unknown "
+ "peer: %s", gd_op_list[rsp.op],
+ uuid_utoa (rsp.uuid));
+ goto out;
+ }
+
uuid_copy (args->uuid, rsp.uuid);
- if (rsp.op == GD_OP_REPLACE_BRICK) {
+ if (rsp.op == GD_OP_REPLACE_BRICK || rsp.op == GD_OP_QUOTA) {
pthread_mutex_lock (&args->lock_dict);
{
ret = glusterd_syncop_aggr_rsp_dict (rsp.op, args->dict,
@@ -383,7 +736,13 @@ _gd_syncop_stage_op_cbk (struct rpc_req *req, struct iovec *iov,
pthread_mutex_unlock (&args->lock_dict);
}
+ op_ret = rsp.op_ret;
+ op_errno = rsp.op_errno;
+
out:
+ gd_collate_errors (args, op_ret, op_errno, rsp.op_errstr,
+ GLUSTERD_MGMT_STAGE_OP, peerinfo, rsp.uuid);
+
if (rsp_dict)
dict_unref (rsp_dict);
@@ -423,7 +782,7 @@ gd_syncop_mgmt_stage_op (struct rpc_clnt *rpc, struct syncargs *args,
goto out;
synclock_unlock (&conf->big_lock);
- ret = gd_syncop_submit_request (rpc, req, args, &gd_mgmt_prog,
+ ret = gd_syncop_submit_request (rpc, req, args, NULL, &gd_mgmt_prog,
GLUSTERD_MGMT_STAGE_OP,
gd_syncop_stage_op_cbk,
(xdrproc_t) xdr_gd1_mgmt_stage_op_req);
@@ -514,8 +873,8 @@ gd_syncop_mgmt_brick_op (struct rpc_clnt *rpc, glusterd_pending_node_t *pnode,
args.op_errno = ENOTCONN;
if ((pnode->type == GD_NODE_NFS) ||
- ((pnode->type == GD_NODE_SHD) &&
- (op == GD_OP_STATUS_VOLUME))) {
+ (pnode->type == GD_NODE_QUOTAD) ||
+ ((pnode->type == GD_NODE_SHD) && (op == GD_OP_STATUS_VOLUME))) {
ret = glusterd_node_op_build_payload
(op, &req, dict_out);
@@ -528,14 +887,15 @@ gd_syncop_mgmt_brick_op (struct rpc_clnt *rpc, glusterd_pending_node_t *pnode,
if (ret)
goto out;
- GD_SYNCOP (rpc, (&args), gd_syncop_brick_op_cbk,
- req, &gd_brick_prog, req->op,
- xdr_gd1_mgmt_brick_op_req);
+ GD_SYNCOP (rpc, (&args), NULL, gd_syncop_brick_op_cbk, req,
+ &gd_brick_prog, req->op, xdr_gd1_mgmt_brick_op_req);
- if (args.errstr && errstr)
- *errstr = args.errstr;
- else
- GF_FREE (args.errstr);
+ if (args.errstr) {
+ if ((strlen(args.errstr) > 0) && errstr)
+ *errstr = args.errstr;
+ else
+ GF_FREE (args.errstr);
+ }
if (GD_OP_STATUS_VOLUME == op) {
ret = dict_set_int32 (args.dict, "index", pnode->index);
@@ -565,12 +925,16 @@ int32_t
_gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov,
int count, void *myframe)
{
- int ret = -1;
- gd1_mgmt_commit_op_rsp rsp = {{0},};
- struct syncargs *args = NULL;
- xlator_t *this = NULL;
- dict_t *rsp_dict = NULL;
- call_frame_t *frame = NULL;
+ int ret = -1;
+ gd1_mgmt_commit_op_rsp rsp = {{0},};
+ struct syncargs *args = NULL;
+ xlator_t *this = NULL;
+ dict_t *rsp_dict = NULL;
+ call_frame_t *frame = NULL;
+ glusterd_peerinfo_t *peerinfo = NULL;
+ int op_ret = -1;
+ int op_errno = -1;
+ int type = GF_QUOTA_OPTION_TYPE_NONE;
this = THIS;
frame = myframe;
@@ -578,7 +942,7 @@ _gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov,
frame->local = NULL;
if (-1 == req->rpc_status) {
- args->op_errno = ENOTCONN;
+ op_errno = ENOTCONN;
goto out;
}
@@ -603,19 +967,44 @@ _gd_syncop_commit_op_cbk (struct rpc_req *req, struct iovec *iov,
}
}
- gd_collate_errors (args, rsp.op_ret, rsp.op_errno, rsp.op_errstr);
+ ret = glusterd_friend_find (rsp.uuid, NULL, &peerinfo);
+ if (ret) {
+ gf_log (this->name, GF_LOG_CRITICAL, "Commit response "
+ "for 'Volume %s' received from unknown "
+ "peer: %s", gd_op_list[rsp.op],
+ uuid_utoa (rsp.uuid));
+ goto out;
+ }
+
uuid_copy (args->uuid, rsp.uuid);
- pthread_mutex_lock (&args->lock_dict);
- {
- ret = glusterd_syncop_aggr_rsp_dict (rsp.op, args->dict,
- rsp_dict);
- if (ret)
- gf_log (this->name, GF_LOG_ERROR, "%s",
- "Failed to aggregate response from "
- " node/brick");
+ if (rsp.op == GD_OP_QUOTA) {
+ ret = dict_get_int32 (args->dict, "type", &type);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to get "
+ "opcode");
+ goto out;
+ }
+ }
+
+ if ((rsp.op != GD_OP_QUOTA) || (type == GF_QUOTA_OPTION_TYPE_LIST)) {
+ pthread_mutex_lock (&args->lock_dict);
+ {
+ ret = glusterd_syncop_aggr_rsp_dict (rsp.op, args->dict,
+ rsp_dict);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR, "%s",
+ "Failed to aggregate response from "
+ " node/brick");
+ }
+ pthread_mutex_unlock (&args->lock_dict);
}
- pthread_mutex_unlock (&args->lock_dict);
+
+ op_ret = rsp.op_ret;
+ op_errno = rsp.op_errno;
+
out:
+ gd_collate_errors (args, op_ret, op_errno, rsp.op_errstr,
+ GLUSTERD_MGMT_COMMIT_OP, peerinfo, rsp.uuid);
if (rsp_dict)
dict_unref (rsp_dict);
@@ -656,7 +1045,7 @@ gd_syncop_mgmt_commit_op (struct rpc_clnt *rpc, struct syncargs *args,
goto out;
synclock_unlock (&conf->big_lock);
- ret = gd_syncop_submit_request (rpc, req, args, &gd_mgmt_prog,
+ ret = gd_syncop_submit_request (rpc, req, args, NULL, &gd_mgmt_prog,
GLUSTERD_MGMT_COMMIT_OP ,
gd_syncop_commit_op_cbk,
(xdrproc_t) xdr_gd1_mgmt_commit_op_req);
@@ -688,8 +1077,8 @@ gd_build_peers_list (struct list_head *peers, struct list_head *xact_peers,
}
int
-gd_lock_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx,
- char **op_errstr, int npeers)
+gd_lock_op_phase (glusterd_conf_t *conf, glusterd_op_t op, dict_t *op_ctx,
+ char **op_errstr, int npeers, uuid_t txn_id)
{
int ret = -1;
int peer_cnt = 0;
@@ -697,6 +1086,9 @@ gd_lock_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx,
xlator_t *this = NULL;
glusterd_peerinfo_t *peerinfo = NULL;
struct syncargs args = {0};
+ struct list_head *peers = NULL;
+
+ peers = &conf->xaction_peers;
if (!npeers) {
ret = 0;
@@ -707,20 +1099,38 @@ gd_lock_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx,
synctask_barrier_init((&args));
peer_cnt = 0;
list_for_each_entry (peerinfo, peers, op_peers_list) {
- gd_syncop_mgmt_lock (peerinfo->rpc, &args, MY_UUID, peer_uuid);
+ if (conf->op_version < GD_OP_VERSION_4) {
+ /* Reset lock status */
+ peerinfo->locked = _gf_false;
+ gd_syncop_mgmt_lock (peerinfo, &args,
+ MY_UUID, peer_uuid);
+ } else
+ gd_syncop_mgmt_v3_lock (op, op_ctx, peerinfo, &args,
+ MY_UUID, peer_uuid, txn_id);
peer_cnt++;
}
gd_synctask_barrier_wait((&args), peer_cnt);
- ret = args.op_ret;
- if (ret) {
- gf_asprintf (op_errstr, "Another transaction could be "
- "in progress. Please try again after "
- "sometime.");
- gf_log (this->name, GF_LOG_ERROR, "Failed to acquire lock");
- goto out;
+
+ if (args.op_ret) {
+ if (args.errstr)
+ *op_errstr = gf_strdup (args.errstr);
+ else {
+ ret = gf_asprintf (op_errstr, "Another transaction "
+ "could be in progress. Please try "
+ "again after sometime.");
+ if (ret == -1)
+ *op_errstr = NULL;
+
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to acquire lock");
+
+ }
}
- ret = 0;
+ ret = args.op_ret;
+
+ gf_log (this->name, GF_LOG_DEBUG, "Sent lock op req for 'Volume %s' "
+ "to %d peers. Returning %d", gd_op_list[op], peer_cnt, ret);
out:
return ret;
}
@@ -750,7 +1160,7 @@ gd_stage_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx,
goto stage_done;
}
- if ((op == GD_OP_REPLACE_BRICK)) {
+ if ((op == GD_OP_REPLACE_BRICK || op == GD_OP_QUOTA)) {
ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict);
if (ret) {
gf_log (this->name, GF_LOG_ERROR, "%s",
@@ -785,14 +1195,27 @@ stage_done:
op, req_dict, op_ctx);
peer_cnt++;
}
+
+ gf_log (this->name, GF_LOG_DEBUG, "Sent stage op req for 'Volume %s' "
+ "to %d peers", gd_op_list[op], peer_cnt);
+
gd_synctask_barrier_wait((&args), peer_cnt);
- ret = args.op_ret;
- if (dict_get_str (op_ctx, "errstr", &errstr) == 0)
+
+ if (args.errstr)
+ *op_errstr = gf_strdup (args.errstr);
+ else if (dict_get_str (op_ctx, "errstr", &errstr) == 0)
*op_errstr = gf_strdup (errstr);
- else if (args.errstr)
- *op_errstr = gf_strdup (args.errstr);
+
+ ret = args.op_ret;
out:
+ if ((ret == 0) && (op == GD_OP_QUOTA)) {
+ ret = glusterd_validate_and_set_gfid (op_ctx, req_dict,
+ op_errstr);
+ if (ret)
+ goto out;
+ }
+
if (rsp_dict)
dict_unref (rsp_dict);
return ret;
@@ -811,6 +1234,7 @@ gd_commit_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx,
uuid_t tmp_uuid = {0};
char *errstr = NULL;
struct syncargs args = {0};
+ int type = GF_QUOTA_OPTION_TYPE_NONE;
this = THIS;
rsp_dict = dict_new ();
@@ -824,15 +1248,28 @@ gd_commit_op_phase (struct list_head *peers, glusterd_op_t op, dict_t *op_ctx,
hostname = "localhost";
goto commit_done;
}
- if (op != GD_OP_SYNC_VOLUME) {
- ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx, rsp_dict);
+
+ if (op == GD_OP_QUOTA) {
+ ret = dict_get_int32 (op_ctx, "type", &type);
if (ret) {
- gf_log (this->name, GF_LOG_ERROR, "%s",
- "Failed to aggregate response "
- "from node/brick");
+ gf_log (this->name, GF_LOG_ERROR, "Failed to get "
+ "opcode");
goto out;
}
}
+
+ if (((op == GD_OP_QUOTA) && (type == GF_QUOTA_OPTION_TYPE_LIST)) ||
+ ((op != GD_OP_SYNC_VOLUME) && (op != GD_OP_QUOTA))) {
+
+ ret = glusterd_syncop_aggr_rsp_dict (op, op_ctx,
+ rsp_dict);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "%s", "Failed to aggregate "
+ "response from node/brick");
+ goto out;
+ }
+ }
+
dict_unref (rsp_dict);
rsp_dict = NULL;
@@ -851,6 +1288,7 @@ commit_done:
ret = 0;
goto out;
}
+
gd_syncargs_init (&args, op_ctx);
synctask_barrier_init((&args));
peer_cnt = 0;
@@ -862,57 +1300,120 @@ commit_done:
}
gd_synctask_barrier_wait((&args), peer_cnt);
ret = args.op_ret;
- if (dict_get_str (op_ctx, "errstr", &errstr) == 0)
+ if (args.errstr)
+ *op_errstr = gf_strdup (args.errstr);
+ else if (dict_get_str (op_ctx, "errstr", &errstr) == 0)
*op_errstr = gf_strdup (errstr);
- else if (args.errstr)
- *op_errstr = gf_strdup (args.errstr);
+ gf_log (this->name, GF_LOG_DEBUG, "Sent commit op req for 'Volume %s' "
+ "to %d peers", gd_op_list[op], peer_cnt);
out:
if (!ret)
glusterd_op_modify_op_ctx (op, op_ctx);
if (rsp_dict)
dict_unref (rsp_dict);
+
+ GF_FREE (args.errstr);
+ args.errstr = NULL;
+
return ret;
}
int
-gd_unlock_op_phase (struct list_head *peers, glusterd_op_t op, int op_ret,
+gd_unlock_op_phase (glusterd_conf_t *conf, glusterd_op_t op, int *op_ret,
rpcsvc_request_t *req, dict_t *op_ctx, char *op_errstr,
- int npeers)
+ int npeers, char *volname, gf_boolean_t is_acquired,
+ uuid_t txn_id)
{
glusterd_peerinfo_t *peerinfo = NULL;
glusterd_peerinfo_t *tmp = NULL;
uuid_t tmp_uuid = {0};
- int peer_cnt = 0;
+ int peer_cnt = 0;
int ret = -1;
xlator_t *this = NULL;
struct syncargs args = {0};
+ struct list_head *peers = NULL;
+
+ peers = &conf->xaction_peers;
if (!npeers) {
ret = 0;
goto out;
}
+ /* If the lock has not been held during this
+ * transaction, do not send unlock requests */
+ if (!is_acquired) {
+ ret = 0;
+ goto out;
+ }
+
this = THIS;
synctask_barrier_init((&args));
peer_cnt = 0;
- list_for_each_entry_safe (peerinfo, tmp, peers, op_peers_list) {
- gd_syncop_mgmt_unlock (peerinfo->rpc, &args, MY_UUID, tmp_uuid);
- list_del_init (&peerinfo->op_peers_list);
- peer_cnt++;
+ if (conf->op_version < GD_OP_VERSION_4) {
+ list_for_each_entry_safe (peerinfo, tmp, peers, op_peers_list) {
+ /* Only unlock peers that were locked */
+ if (peerinfo->locked) {
+ gd_syncop_mgmt_unlock (peerinfo, &args,
+ MY_UUID, tmp_uuid);
+ peer_cnt++;
+ list_del_init (&peerinfo->op_peers_list);
+ }
+ }
+ } else {
+ if (volname) {
+ list_for_each_entry_safe (peerinfo, tmp,
+ peers, op_peers_list) {
+ gd_syncop_mgmt_v3_unlock (op_ctx, peerinfo,
+ &args, MY_UUID,
+ tmp_uuid, txn_id);
+ peer_cnt++;
+ list_del_init (&peerinfo->op_peers_list);
+ }
+ }
}
gd_synctask_barrier_wait((&args), peer_cnt);
+
ret = args.op_ret;
+
+ gf_log (this->name, GF_LOG_DEBUG, "Sent unlock op req for 'Volume %s' "
+ "to %d peers. Returning %d", gd_op_list[op], peer_cnt, ret);
if (ret) {
gf_log (this->name, GF_LOG_ERROR, "Failed to unlock "
"on some peer(s)");
}
out:
- glusterd_op_send_cli_response (op, op_ret, 0, req, op_ctx, op_errstr);
- glusterd_op_clear_op (op);
- glusterd_unlock (MY_UUID);
+ /* If unlock failed, and op_ret was previously set
+ * priority is given to the op_ret. If op_ret was
+ * not set, and unlock failed, then set op_ret */
+ if (!*op_ret)
+ *op_ret = ret;
+
+ if (is_acquired) {
+ /* Based on the op-version,
+ * we release the cluster or mgmt_v3 lock
+ * and clear the op */
+
+ glusterd_op_clear_op (op);
+ if (conf->op_version < GD_OP_VERSION_4)
+ glusterd_unlock (MY_UUID);
+ else {
+ if (volname) {
+ ret = glusterd_mgmt_v3_unlock (volname, MY_UUID,
+ "vol");
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR,
+ "Unable to release lock for %s",
+ volname);
+ }
+ }
+ }
+
+ if (!*op_ret)
+ *op_ret = ret;
return 0;
}
@@ -929,7 +1430,8 @@ gd_get_brick_count (struct list_head *bricks)
}
int
-gd_brick_op_phase (glusterd_op_t op, dict_t *op_ctx, dict_t *req_dict, char **op_errstr)
+gd_brick_op_phase (glusterd_op_t op, dict_t *op_ctx, dict_t *req_dict,
+ char **op_errstr)
{
glusterd_pending_node_t *pending_node = NULL;
struct list_head selected = {0,};
@@ -1001,14 +1503,20 @@ out:
void
gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)
{
- int ret = -1;
- int npeers = 0;
- dict_t *req_dict = NULL;
- glusterd_conf_t *conf = NULL;
- glusterd_op_t op = 0;
- int32_t tmp_op = 0;
- char *op_errstr = NULL;
- xlator_t *this = NULL;
+ int ret = -1;
+ int op_ret = -1;
+ int npeers = 0;
+ dict_t *req_dict = NULL;
+ glusterd_conf_t *conf = NULL;
+ glusterd_op_t op = 0;
+ int32_t tmp_op = 0;
+ char *op_errstr = NULL;
+ char *tmp = NULL;
+ char *volname = NULL;
+ xlator_t *this = NULL;
+ gf_boolean_t is_acquired = _gf_false;
+ uuid_t *txn_id = NULL;
+ glusterd_op_info_t txn_opinfo;
this = THIS;
GF_ASSERT (this);
@@ -1021,26 +1529,97 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)
"operation");
goto out;
}
-
op = tmp_op;
- ret = glusterd_lock (MY_UUID);
+
+ /* Generate a transaction-id for this operation and
+ * save it in the dict */
+ ret = glusterd_generate_txn_id (op_ctx, &txn_id);
if (ret) {
- gf_log (this->name, GF_LOG_ERROR, "Unable to acquire lock");
- gf_asprintf (&op_errstr, "Another transaction is in progress. "
- "Please try again after sometime.");
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to generate transaction id");
goto out;
}
- /* storing op globally to access in synctask code paths
- * This is still acceptable, as we are performing this under
- * the 'cluster' lock*/
- glusterd_op_set_op (op);
+ /* Save opinfo for this transaction with the transaction id */
+ glusterd_txn_opinfo_init (&txn_opinfo, NULL, &op, NULL, NULL);
+ ret = glusterd_set_txn_opinfo (txn_id, &txn_opinfo);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR,
+ "Unable to set transaction's opinfo");
+
+ gf_log (this->name, GF_LOG_DEBUG,
+ "Transaction ID : %s", uuid_utoa (*txn_id));
+
+ opinfo = txn_opinfo;
+
+ /* Save the MY_UUID as the originator_uuid */
+ ret = glusterd_set_originator_uuid (op_ctx);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to set originator_uuid.");
+ goto out;
+ }
+
+ /* Based on the op_version, acquire a cluster or mgmt_v3 lock */
+ if (conf->op_version < GD_OP_VERSION_4) {
+ ret = glusterd_lock (MY_UUID);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Unable to acquire lock");
+ gf_asprintf (&op_errstr,
+ "Another transaction is in progress. "
+ "Please try again after sometime.");
+ goto out;
+ }
+ } else {
+
+ /* If no volname is given as a part of the command, locks will
+ * not be held */
+ ret = dict_get_str (op_ctx, "volname", &tmp);
+ if (ret) {
+ gf_log ("", GF_LOG_DEBUG, "Failed to get volume "
+ "name");
+ goto local_locking_done;
+ } else {
+ /* Use a copy of volname, as cli response will be
+ * sent before the unlock, and the volname in the
+ * dict, might be removed */
+ volname = gf_strdup (tmp);
+ if (!volname)
+ goto out;
+ }
+
+ ret = glusterd_mgmt_v3_lock (volname, MY_UUID, "vol");
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Unable to acquire lock for %s", volname);
+ gf_asprintf (&op_errstr,
+ "Another transaction is in progress "
+ "for %s. Please try again after sometime.",
+ volname);
+ goto out;
+ }
+ }
+
+ is_acquired = _gf_true;
+
+local_locking_done:
+
INIT_LIST_HEAD (&conf->xaction_peers);
+
npeers = gd_build_peers_list (&conf->peers, &conf->xaction_peers, op);
- ret = gd_lock_op_phase (&conf->xaction_peers, op, op_ctx, &op_errstr, npeers);
- if (ret)
- goto out;
+ /* If no volname is given as a part of the command, locks will
+ * not be held */
+ if (volname || (conf->op_version < GD_OP_VERSION_4)) {
+ ret = gd_lock_op_phase (conf, op, op_ctx, &op_errstr,
+ npeers, *txn_id);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Locking Peers Failed.");
+ goto out;
+ }
+ }
ret = glusterd_op_build_payload (&req_dict, &op_errstr, op_ctx);
if (ret) {
@@ -1067,14 +1646,34 @@ gd_sync_task_begin (dict_t *op_ctx, rpcsvc_request_t * req)
ret = 0;
out:
- (void) gd_unlock_op_phase (&conf->xaction_peers, op, ret, req,
- op_ctx, op_errstr, npeers);
+ op_ret = ret;
+ if (txn_id) {
+ (void) gd_unlock_op_phase (conf, op, &op_ret, req,
+ op_ctx, op_errstr,
+ npeers, volname,
+ is_acquired, *txn_id);
+
+ /* Clearing the transaction opinfo */
+ ret = glusterd_clear_txn_opinfo (txn_id);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR,
+ "Unable to clear transaction's "
+ "opinfo for transaction ID : %s",
+ uuid_utoa (*txn_id));
+ }
+
+ glusterd_op_send_cli_response (op, op_ret, 0, req, op_ctx, op_errstr);
+
+ if (volname)
+ GF_FREE (volname);
if (req_dict)
dict_unref (req_dict);
- if (op_errstr)
+ if (op_errstr) {
GF_FREE (op_errstr);
+ op_errstr = NULL;
+ }
return;
}