当前位置 : 主页 > 编程语言 > python >

【ceph】ceph命令处理流程代码分析(终端敲命令之后发生的事)

来源:互联网 收集:自由互联 发布时间:2022-06-18
​目录​ ​​后端部分 Admin Socket​​ ​​前端部分python​​ ceph 命令下发流程:命令行/usr/bin/ceph--python 前端部分处理--Admin Socket后端部分处理 后端部分 Admin Socket ​ 前端部分python ​


​目录​


​​后端部分 Admin Socket​​

​​前端部分 python​​


ceph 命令下发流程:命令行/usr/bin/ceph-->python 前端部分处理-->Admin Socket后端部分处理

后端部分 Admin Socket

前端部分 python

​从命令行到python到C语言的过程​


既然讲了admin_socket的的后台部分,那前端输入命令到底是怎么去调用的后台呢,或者前台的命令到底是怎么发送的呢?平时的ceph命令到底是怎么解析的呢?

在终端敲入了ceph stastus,终端就返回了集群的状态。那在这个过程中,到底是调用了什么。

主要涉及了

ceph-12.2.2\src\ceph.in

ceph-12.2.2\src\pybind\ceph_argparse.py

ceph-12.2.2\src\pybind\rados\rados.pyx

这里的pybind目录基本ceph所有的python脚本都在该目录下。

首先ceph命令都是由/usr/bin/ceph来执行,打开该文件发现内容与ceph.in相同,即这个脚本是由源码里面的ceph.in文件生成而来。

而在ceph.in中import了rados,


import rados
···
cluster_handle = run_in_thread(rados.Rados,name=name,
    clustername=clustername,conf_defaults=conf_defaults,
    conffile=conffile)
···

json_command(cluster_handle, target=target, argdict=valid_dict,inbuf=inbuf)


json_command在src\pybind\ceph_argparse.py中定义。


def json_command(cluster,
                 target: Optional[Tuple[str, str]] = ('mon', ''),
                 prefix: Optional[str] = None,
                 argdict: Optional[Dict[str, str]] = None,
                 inbuf: Optional[bytes] = b'',
                 timeout: Optional[int] = 0,
                 verbose: Optional[bool] = False) -> Tuple[int, bytes, str]:
   ……

    try:
     ……
        ret, outbuf, outs = send_command_retry(cluster,
                                               target, json.dumps(cmddict),
                                               inbuf, timeout, verbose)



def send_command_retry(*args, **kwargs):
    while True:
        try:
            return send_command(*args, **kwargs)
        except Exception as e:
            if ('get_command_descriptions' in str(e) and
                'object in state configuring' in str(e)):
                continue
            else:
                raise



def send_command(cluster, target=('mon', ''), cmd=None,
                  inbuf=b'', timeout=0,verbose=False):
···
   if len(target) < 2 or target[1] == '': 
      ret, outbuf, outs = run_in_thread(cluster.mon_command,
                                  cmd, inbuf, timeout)
   else:
      ret, outbuf, outs = run_in_thread(cluster.mon_command,
                           cmd, inbuf, timeout, target[1])
···                  


调用了src\pybind\rados\rados.pyx的rados类中的mon_command():cluster.mon_command


cdef class Rados(object):
    """This class wraps librados functions"""
    # NOTE(sileht): attributes declared in .pyd

    def __init__(self, *args, **kwargs):
        PyEval_InitThreads()
           self.__setup(*args, **kwargs)
    def mon_command(self, cmd, inbuf, timeout=0, 
                    target=None):
    ···
 



def mon_command(self, cmd, inbuf, timeout=0, target=None):
    ……
    try:
            if target:
                 with nogil:
                      ret =rados_mon_command_target(self.cluster,
                       _target,<const char **>_cmd,
                       _cmdlen,<const char*>_inbuf,
                       _inbuf_len,&_outbuf, &_outbuf_len,
                       &_outs, &_outs_len)
            else:
                 with nogil:
                      ret = rados_mon_command(self.cluster,
                          <const char **>_cmd, _cmdlen,
                          <const char*>_inbuf, _inbuf_len,
                          &_outbuf, &_outbuf_len,&_outs,
                          &_outs_len)
    ……


根据src\pybind\rados\rados.pyx文件中


cdef extern from "rados/librados.h" nogil:
····
int rados_mon_command(rados_t cluster, const char **cmd,
                     size_t cmdlen,const char *inbuf, 
                     size_t inbuflen,char **outbuf, 
                     size_t *outbuflen,char **outs, 
                     size_t *outslen)
···


千辛万苦到了ceph的C++代码部分啦

这波骚操作令人窒息。笔者作为小菜鸡,第一次看通的时候也是长舒了一口气。

下面是常规操作。

librados.h


extern "C" int rados_mon_command(rados_t cluster, 
                            const char **cmd,size_t cmdlen,
                            const char *inbuf, 
                            size_t inbuflen,char **outbuf,
                            size_t *outbuflen,char **outs,
                            size_t *outslen)
{
  tracepoint(librados, rados_mon_command_enter, cluster, cmdlen, inbuf, inbuflen);
  librados::RadosClient *client = (librados::RadosClient*)
                                cluster;
  bufferlist inbl;
  bufferlist outbl;
  string outstring;
  vector<string> cmdvec;

  for (size_t i = 0; i < cmdlen; i++) {
    tracepoint(librados, rados_mon_command_cmd, cmd[i]);
    cmdvec.push_back(cmd[i]);
  }

  inbl.append(inbuf, inbuflen);
  int ret = client->mon_command(cmdvec, inbl, &outbl, &outstring);

  do_out_buffer(outbl, outbuf, outbuflen);
  do_out_buffer(outstring, outs, outslen);
  tracepoint(librados, rados_mon_command_exit, ret, outbuf, 
              outbuflen, outs, outslen);
  return ret;
}


【ceph】ceph命令处理流程代码分析(终端敲命令之后发生的事)_python

在最新版本中mgr负责了一些诸如ceph osd df命令的解析。mgr承担了一部分原本由monitor来完成的事情

在下面的函数里也进行了对命令的处理

​实例演示​

摘抄自​:​​https://blog.51cto.com/u_15127692/3915231​​

ceph -s是如何与ceph cluster进行交互的呢?

命令行敲入ceph -s,执行/usr/bin/ceph, 传入参数-s

【ceph】ceph命令处理流程代码分析(终端敲命令之后发生的事)_ceph_02

 从该文件起初的导入部分可以得知:

【ceph】ceph命令处理流程代码分析(终端敲命令之后发生的事)_python_03

 由send_command()去与ceph cluster通信:

def send_command(cluster, target=('mon', ''), cmd=None, inbuf='', timeout=0,
verbose=False):
""" 注释部分:使用librados的mon_command/osd_command/pg_command给daemon发送指令
Send a command to a daemon using librados's
mon_command, osd_command, or pg_command. Any bulk input data
comes in inbuf.

Returns (ret, outbuf, outs); ret is the return code, outbuf is
the outbl "bulk useful output" buffer, and outs is any status
or error message (intended for stderr).

If target is osd.N, send command to that osd (except for pgid cmds)
"""
cmd = cmd or []
try: #target[0] 用以区分发送的目标类型,mon、osd、pg、mds的
if target[0] == 'osd':
osdid = target[1]

if verbose:
print >> sys.stderr, 'submit {0} to osd.{1}'.\
format(cmd, osdid)
ret, outbuf, outs = \
cluster.osd_command(osdid, cmd, inbuf, timeout)  #osd中消息发送主体

elif target[0] == 'pg':
pgid = target[1]
# pgid will already be in the command for the pg <pgid>
# form, but for tell <pgid>, we need to put it in
if cmd:
cmddict = json.loads(cmd[0])
cmddict['pgid'] = pgid
else:
cmddict = dict(pgid=pgid)
cmd = [json.dumps(cmddict)]
if verbose:
print >> sys.stderr, 'submit {0} for pgid {1}'.\
format(cmd, pgid)
ret, outbuf, outs = \
cluster.pg_command(pgid, cmd, inbuf, timeout)

elif target[0] == 'mon':
if verbose:
print >> sys.stderr, '{0} to {1}'.\
format(cmd, target[0])
if target[1] == '':
ret, outbuf, outs = cluster.mon_command(cmd, inbuf, timeout)
else:
ret, outbuf, outs = cluster.mon_command(cmd, inbuf, timeout, target[1])
elif target[0] == 'mds':
mds_spec = target[1]

if verbose:
print >> sys.stderr, 'submit {0} to mds.{1}'.\
format(cmd, mds_spec)

try:
from cephfs import LibCephFS
except ImportError:
raise RuntimeError("CephFS unavailable, have you installed libcephfs?")

filesystem = LibCephFS(cluster.conf_defaults, cluster.conffile)
filesystem.conf_parse_argv(cluster.parsed_args)

filesystem.init()
ret, outbuf, outs = \
filesystem.mds_command(mds_spec, cmd, inbuf)
filesystem.shutdown()
else:
raise ArgumentValid("Bad target type '{0}'".format(target[0]))

except Exception as e:
if not isinstance(e, ArgumentError):
raise RuntimeError('"{0}": exception {1}'.format(cmd, e))
else:
raise

return ret, outbuf, outs

此时以给mon的指令为例,继续分析:

【ceph】ceph命令处理流程代码分析(终端敲命令之后发生的事)_python_04

 rados_mon_command_target()调用;

【ceph】ceph命令处理流程代码分析(终端敲命令之后发生的事)_ceph_05

client->mon_command()调用;client为librados::RadosClient;

【ceph】ceph命令处理流程代码分析(终端敲命令之后发生的事)_命令行_06

 继续start_mon_command():

【ceph】ceph命令处理流程代码分析(终端敲命令之后发生的事)_python_07

之后调用MonClient::_send_command()里面继续_send_mon_message(m),然后调用底层封装的socket发送消息;

接来下来从mon端(即server端)入手,从接收到消息,解包开始:消息的tag为:MSG_MON_COMMAND,消息主体是一个结构体MMonCommand

【ceph】ceph命令处理流程代码分析(终端敲命令之后发生的事)_json_08

从mon/Monitor.cc中的handle_command函数入手,查询ceph cluster状态 get_cluster_status()函数:

【ceph】ceph命令处理流程代码分析(终端敲命令之后发生的事)_json_09

继续跟get_cluster_status()函数:

【ceph】ceph命令处理流程代码分析(终端敲命令之后发生的事)_json_10

【ceph】ceph命令处理流程代码分析(终端敲命令之后发生的事)_json_11


【本文转自:日本cn2服务器 http://www.558idc.com/jap.html提供,感恩】
上一篇:响应对象Response
下一篇:没有了
网友评论