unix_socket_deamon

本地进程之间通过实REST来进行进程通讯

unix socket 可以提供本机的不同进程间通过类似于 http 的请求来通讯;后台进程服务启动入口,init_deamon_process

# -*- coding: utf-8 -*-
import os
import socket
import json
import subprocess
import logging
from tornado.options import options
from tornado.httpclient import HTTPError as ClientHTTPError
from clouds.common import codeutils
from clouds.common.tornadoutils import cerror2serror


def is_service_on(unix_socket):
    if not os.path.exists(unix_socket):
        return False
    s = socket.socket(socket.AF_UNIX)
    try:
        s.connect(unix_socket)
    except socket.error:
        return False
    else:
        return True
    finally:
        s.close()


_region_mebsd = None
_workspace = None
_unix_socket = None


def register_deamon_task(id, action, zone, region_task_id=None):
    from clouds.common.http_uds_client import get_client
    client = get_client()
    url = '/'
    data = {'id': id, 'action': action, 'zone': zone}
    if region_task_id is not None:
        data['region_task_id'] = region_task_id
    body = json.dumps(data)
    try:
        client.sync_request(_unix_socket, 'POST', url, body=body)
    except ClientHTTPError as e:
        serr = cerror2serror(e)
        logging.error('Register deamon long time task failed: %s' % str(serr))
        raise serr


def init_deamon_process():
    global _deamon_process, _workspace, _unix_socket
    _workspace = os.path.join(codeutils.get_workspace_path(), 'task_deamon')
    if not os.path.exists(_workspace):
        os.mkdir(_workspace)
    _unix_socket = os.path.join(_workspace, 'task_deamon.socket')
    if is_service_on(_unix_socket):
        logging.info('task_deamon is running...')
    else:
        logging.info('start to run region_mebsd...')
        _deamon_process = subprocess.Popen(
            [
                "./task_deamon",
                "--workspace=%s" % _workspace,
                "--unix-socket=%s" % _unix_socket,
                "--config-file=%s" % options.config_file,
            ], close_fds=True)                              # close_fds doesn't apply to fd 0,1,2


def stop_deamon_process():
    global _deamon_process
    if _deamon_process is not None:
        _deamon_process.terminate()   # send signal TERM

和后台进程通讯的 unix socket 客户端

from clouds.common.httputils import AsyncHTTPClientManager
import urllib
import httplib
import socket
import sys


class HttpUnixDomainSocketClient(AsyncHTTPClientManager):

    def sync_request(self, socket_path, method, url, *args, **kwargs):
        quote_path = urllib.quote(socket_path, safe='')
        if not url.startswith('/'):
            url = '/' + url
        req_url = 'http+unix://' + quote_path + url
        kwargs['sync'] = True
        return self.json_fetch(method, req_url, *args, **kwargs)


_default_client = None


def get_client():
    global _default_client
    if _default_client is None:
        print "Http unix domain socket client initialized"
        _default_client = HttpUnixDomainSocketClient()
    return _default_client


def _has_timeout(timeout):
    if hasattr(socket, '_GLOBAL_DEFAULT_TIMEOUT'):
        return timeout is not None and timeout is not socket._GLOBAL_DEFAULT_TIMEOUT
    return timeout is not None


class HttpUnixDomainSocketConnection(httplib.HTTPConnection):
    """
    HTTP over UNIX Domain Sockets
    URI format: http+unix://%2Fpath%2Fto%2Fsocket.sock/location
    Socket path is url_quoted
    """

    def __init__(self, host, port=None, strict=None, timeout=None, proxy_info=None):
        httplib.HTTPConnection.__init__(self, host, port)
        self.timeout = timeout

    def connect(self):

        try:
            self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)

            if _has_timeout(self.timeout):
                self.sock.settimeout(self.timeout)

            self.sock.connect(urllib.unquote(self.host))
        except socket.error as msg:
            if self.sock:
                self.sock.close()
            self.sock = None

            raise socket.error(msg)


# Monkey patch this module to httplib2
sys.modules['httplib2'].SCHEME_TO_CONNECTION['http+unix'] = HttpUnixDomainSocketConnection

后台耗时任务进程, 采用的是 tornado 异步框架,相当于一个 web 后台服务;后台耗时任务完成之后,再通过 REST 风格的请求回调回去任务的注册者,告知任务注册者任务执行完毕,也就是说对于任务的注册者,任务的进行时异步的,这种异步是基于 REST 实现而不是 RPC或者消息。

# import time
import os
import json
import urlparse
import sys
import tornado
import Queue
import threading
import signal
import datetime
import functools

from tornado.httpclient import AsyncHTTPClient, HTTPRequest
from tornado.httpserver import HTTPServer
from tornado.httputil import HTTPHeaders
from tornado.ioloop import IOLoop
from tornado.netutil import bind_unix_socket
from tornado.web import RequestHandler, Application, HTTPError
from tornado.gen import coroutine
from tornado.options import define, options
from clouds.mebs import client as mebs_client
from clouds.common import config
from clouds.common import auth
from clouds.common.handler.ping import PingHandler
from clouds.common.service import ServiceBase

from clouds.models.disks import Disks
from clouds.models.mebs_backups import MEBSBackups


NUM_WORKERS = 4
CHECK_INTERVAL = 10
TIMEOUT = 3600 * 12
PERSISTENCE_FILE = 'data.keep'

queue = Queue.Queue()
persistence = set()

ACTIONS = ['clone_snapshot', 'clone', 'backup', 'restore_backup', 'delete_backup',
           'create_template', 'commit_template']


class Task(object):

    def __init__(self, id, action, zone):
        """ Note: backup task doesn't know its size when calling the API,
            needs to report here in region_mebsd
        """
        self.id = id
        self.action = action
        self.zone = zone    # zone name
        self.count = 0
        self.size_in_bytes = 0
        self.size_reported = True
        self.region_task_id = None

    def __str__(self):
        return 'mebsd task => id=%s, action=%s, zone=%s, count=%s' % (self.id, self.action, self.zone, self.count)

    def execute(self):
        self.count += CHECK_INTERVAL
        finished, error = self.get_task_status()
        if error:
            IOLoop.instance().add_callback(MainHandler.task_failed, self)
        elif finished:
            IOLoop.instance().add_callback(MainHandler.task_finished, self)
        elif self.count < TIMEOUT:
            IOLoop.instance().add_callback(MainHandler.check_task_later, self)
        else:
            IOLoop.instance().add_callback(MainHandler.task_failed, self)

    def get_task_status(self):
        from clouds.mebs.client import get_client
        mebs_cli = get_client(self.zone)
        if self.action == 'clone_snapshot' or self.action == 'clone':
            p = mebs_cli.qcow2.Snapshot.get_clone_progress(self.id)
            if p is None:
                return False, False
            finished = True if p.percent == 100 else False
            return finished, p.error

        elif self.action == 'backup':
            p = mebs_cli.qcow2.Snapshot.get_backup_progress(self.id)
            if p is None:
                return False, False
            if p.total_size_byte > 0 and self.size_in_bytes != p.total_size_byte:
                self.size_in_bytes = p.total_size_byte
                self.size_reported = False
            finished = True if p.percent == 100 else False
            return finished, p.error

        elif self.action == 'restore_backup':
            p = mebs_cli.qcow2.Snapshot.get_restore_progress(self.id)
            if p is None:
                return False, False
            finished = True if p.percent == 100 else False
            return finished, p.error

        elif self.action == 'delete_backup':
            p = mebs_cli.qcow2.Snapshot.get_delbackup_progress(self.id)
            if p is None:
                return False, False
            finished = True if p.percent == 100 else False
            return finished, p.error

        elif self.action == 'create_template':
            p = mebs_cli.template.get_create_progress(self.id)
            if p is None:
                return False, False
            finished = True if p.percent == 100 else False
            return finished, p.error

        elif self.action == 'commit_template':
            p = mebs_cli.template.get_commit_progress(self.id)
            if p is None:
                return False, False
            finished = True if p.percent == 100 else False
            return finished, p.error

        elif self.action == 'migrate_template':
            p = mebs_cli.qcow2.query_migrate(self.id)
            if p is None:
                return False, False
            finished = True if p.percent == 100 else False
            return finished, p.error

        return False, True


class WorkerManager(object):

    def init(self):
        for i in range(NUM_WORKERS):
            t = threading.Thread(target=self.worker)
            t.setDaemon(True)
            t.start()

    @staticmethod
    def worker():
        while True:
            task = queue.get()
            task.execute()
            queue.task_done()


class MainHandler(RequestHandler):

    @coroutine
    def post(self, *args, **kwargs):
        """ json body example:
            {'action': 'action', 'id': 'idstr', 'zone': 'zone'...}
        """
        try:
            body = json.loads(self.request.body)
        except Exception:
            raise HTTPError(400, 'Invalid body')
        if body['action'] not in ACTIONS:
            raise HTTPError(400, 'Invalid action')
        task = Task(body['id'], body['action'], body['zone'])
        if 'region_task_id' in body:
            task.region_task_id = body['region_task_id']
        queue.put(task)
        print task, 'start'
        self.write(json.dumps(''))

    @classmethod
    @coroutine
    def json_fetch(cls, method, url, headers=None, data=None):
        service_url = auth.get_manager().get_service_url('compute', region=options.region)
        token = auth.get_manager().get_token()
        url = urlparse.urljoin(service_url, url)
        client = AsyncHTTPClient()
        xheaders = {'X-Auth-Token': token,
                    "Content-Type": "application/json charset=utf-8"}
        if headers is not None:
            xheaders.update(headers)
        xheaders = HTTPHeaders(xheaders)
        if data is None:
            data = {}
        body = json.dumps(data)
        request = HTTPRequest(url, method, xheaders, body)
        response = yield client.fetch(request)
        raise tornado.gen.Return(response)

    @classmethod
    @coroutine
    def check_task_later(cls, task):

        def insert(_task):
            persistence.remove(_task)
            queue.put(_task)

        persistence.add(task)
        if task.action == 'backup' and not task.size_reported:
            task.size_reported = True       # only report to region once
            data = {'mebs_backup': {'size': task.size_in_bytes / 1024 / 1024}}
            yield cls.json_fetch('PUT', '/mebs_backups/%s' % task.id, data=data)

        IOLoop.instance().add_timeout(datetime.timedelta(seconds=CHECK_INTERVAL),
                                      functools.partial(insert, task))

    @classmethod
    @coroutine
    def task_finished(cls, task):
        print task, 'finished'
        if task.action == 'clone_snapshot':
            data = {'disk': {'status': Disks.DISK_READY}}
            yield cls.json_fetch('PUT', '/disks/%s' % task.id, data=data)

        elif task.action == 'restore_backup':
            yield cls.json_fetch('POST', '/disks/%s/notify-restore-mebs-backup-succ' % task.id,
                                 data={'disk': {}})

        elif task.action == 'backup':
            data = {'mebs_backup': {'status': MEBSBackups.STATUS_READY}}
            if not task.size_reported:
                data['mebs_backup']['size'] = task.size_in_bytes / 1024 / 1024
            yield cls.json_fetch('PUT', '/mebs_backups/%s' % task.id, data=data)

        elif task.action == 'delete_backup':
            yield cls.json_fetch('POST', '/mebs_backups/%s/notify-delete-succ' % task.id,
                                 data={'mebs_backup': {}})

        if task.region_task_id is not None:
            yield cls.json_fetch('POST', '/tasks/%s' % task.region_task_id)

    @classmethod
    @coroutine
    def task_failed(cls, task):
        print task, 'failed'
        if task.action == 'clone_snapshot':
            yield cls.json_fetch('PUT', '/disks/%s' % task.id,
                                 data={'disk': {'status': Disks.CLONE_MEBS_SNAPSHOT_FAILED}})

        elif task.action == 'restore_backup':
            yield cls.json_fetch('POST', '/disks/%s/notify-restore-mebs-backup-fail' % task.id)

        elif task.action == 'backup':
            yield cls.json_fetch('PUT', '/mebs_backups/%s' % task.id,
                                 data={'mebs_backup': {'status': MEBSBackups.STATUS_BACKUP_FAILED}})

        elif task.action == 'delete_backup':
            yield cls.json_fetch('POST', '/mebs_backups/%s/notify-delete-fail' % task.id)

        if task.region_task_id is not None:
            yield cls.json_fetch('POST', '/tasks/%s' % task.region_task_id,
                                 data={'__status__': 'error', '__reason__': 'region_mebsd task failed'})


def receive_signal(signum, stack):

    def persistence_then_exit():
        data = []
        make_data = lambda t: {'id': t.id, 'action': t.action, 'zone': t.zone,
                               'count': t.count, 'size_in_bytes': t.size_in_bytes,
                               'size_reported': t.size_reported,
                               'region_task_id': t.region_task_id}
        while not queue.empty():
            task = queue.get()
            data.append(make_data(task))
        for task in persistence:
            data.append(make_data(task))
        if data:
            with open(os.path.join(options.workspace, PERSISTENCE_FILE), 'w') as f:
                f.write(json.dumps(data))
        sys.exit(0)     # exit the main thread, the other daemon worker threads exit as well

    if signum in ServiceBase.get_trap_signums():
        IOLoop.instance().add_callback_from_signal(persistence_then_exit)


def trap_signals():
    for signum in ServiceBase.get_trap_signums():
        signal.signal(signum, receive_signal)


def recover_persistence():
    filepath = os.path.join(options.workspace, PERSISTENCE_FILE)
    if not os.path.exists(filepath):
        return
    with open(filepath, 'r') as f:
        data = json.loads(f.read())
    for i in data:
        task = Task(i['id'], i['action'], i['zone'])
        task.count = i['count']
        task.size_in_bytes = i['size_in_bytes']
        task.size_reported = i['size_reported']
        task.region_task_id = i['region_task_id']
        queue.put(task)
    os.unlink(filepath)


def main():
    define('workspace', help='Workspace dir')
    define('unix_socket', help='Unix Domain Socket file')
    define('enable_mebs', default=False, type=bool, help='MEBS service trigger')
    define('mebs_src_dir', help='MEBS API source code dir')
    define('mebs_bin_dir', help='MEBS binary exe dir')
    define('mebs_log_config', default='/etc/mebs/log.conf', help='Logging level control')
    define('mebs_log_dir', help='MEBS log dir')
    define('mebs_manager_config')
    define('mebs_redis_config')
    define('mebs_sql_connection')

    config.init("etc/region.conf")

    config.check_options(['region', 'port', 'auth_uri',
                          'admin_user', 'admin_password',
                          'admin_tenant_name', 'workspace', 'unix_socket'])

    if getattr(options, 'enable_mebs', False):
        config.check_options(['mebs_src_dir', 'mebs_redis_config'])
        mebs_client.configure_at_region()

    wait_auth = datetime.timedelta(seconds=2)
    IOLoop.instance().add_timeout(wait_auth, recover_persistence)
    trap_signals()
    auth.init()
    worker_man = WorkerManager()
    worker_man.init()

    application = Application([(r"/ping", PingHandler),
                               (r"/", MainHandler)])
    server = HTTPServer(application, xheaders=True)
    sock = bind_unix_socket(options.unix_socket)
    server.add_socket(sock)
    server.start()
    IOLoop.instance().start()