python-signal

signal 进行进程间异步通讯

signal 进行进程间线程异步通讯主要使用在控制层面,通过发送信号和捕获信号进行处理的方式交互。模式基本上和 APUE 一致,都遵循POSIX标准。给相应的信号注册一个处理器函数,当接受到信号时自行处理。

Python 中,signal不建议用在线程之间!因为线程中使用signal有很多限制,线程通讯可以使用锁或者event。

python sinnal 遵循 BSD style interface,除了SIGCHLD和具体实现有关。

即使在临界区中,signal 也无法被阻塞; 信号可以被延迟执行;

当进程正在执行 IO 操作的时候,因为一个信号,可能会在信号处理器函数 handler 返回之后抛出IO异常。

python 中 SIGPIPE 信号默认被忽略(因此在 pipes 和 sockets 上的写错误会被当成异常处理,SIGINT 默认被转换成 KeyboardInterrupt 异常。

signal.signal() 函数只能在主线程中使用,即只有主线程可以注册信号处理函数; 任何一个线程都可以使用 alarm(), getsignal(), pause(), setitimer(), getitimer(). 只有主线程可以接受信号,这就是为什么线程通讯不适合用signal。

import signal

def get_trap_signums():
    signal_list = ('SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGTERM')
    signum_list = []
    for signal_name in signal_list:
        signum_list.append(getattr(signal, signal_name))
    return signum_list

get_trap_signums()

常用信号发送

pause()信号发送(Windows上不可得)

调用pause之后,进程睡眠一直到接受到一个信号。

siginterrupt(signalnum, flag)改变系统调用的行为

如果对应signalnum的flag设置为true,那么被该signalnum中断的系统调用会重新启动调用。

set_wakeup_fd(fd)

设置fd,当信号到达的时候, \0 字节会被写入该fd。

setitimer(which, seconds[,interval])

alarm(time)信号发送

打开文件设备无穷等待时候,使用alarm() 函数可以设置打开超时,抛出异常。

import signal, os

def handler(signum, frame):
    print 'Signal handler called with signal', signum
    raise IOError("Couldn't open device!")

# Set the signal handler and a 5-second alarm
signal.signal(signal.SIGALRM, handler)
signal.alarm(5)

# This open() may hang indefinitely
fd = os.open('/dev/ttyS0', os.O_RDWR)

signal.alarm(0)          # Disable the alarm

prefork 模式,通过信号管理从进程,Python中prefork模式的简单实现

# -*- coding: utf-8 -*-
#master-slaves.py  python2.7.x
#orangleliu@gmail.com
'''
简单的模拟pre-fork模式,master进程控制多个子进程

这里实现这么几个信号
INT ctrl+c 退出
TTIN 增加一个worker
TTOU 减少一个worker
'''

import os
import sys
import signal
import time
import random


class Worker(object):
    '''
    子进程要实现一些特定的信号来响应外界和父进程的操作
    '''

    def run(self):
        while True:
            time.sleep(3)


class Master(object):

    WORKERS = {}
    SIG_QUEUE = []
    SIGNALS = [getattr(signal, "SIG%s" % x)
                for x in "INT TTIN TTOU".split()]
    SIG_NAMES = dict(
        (getattr(signal, name), name[3:].lower()) for name in dir(signal)
        if name[:3] == "SIG" and name[3] != "_"
    )

    def __init__(self, worker_nums=2):
        self.worker_nums = worker_nums
        self.master_name = "Master"
        self.reexec_pid = 0

    def start(self):
        print "start master"

        self.pid = os.getpid()
        self.init_signals()

    def init_signals(self):
        [signal.signal(s, self.signal) for s in self.SIGNALS]
        signal.signal(signal.SIGCHLD, self.handle_chld)

    def signal(self, sig, frame):
        '''
        普通的信号发生的时候,往信号队列增加一个信号
        '''
        if len(self.SIG_QUEUE) < 5:
            self.SIG_QUEUE.append(sig)

    def run(self):
        self.start()

        try:
            self.manage_workers()
            while True:
                # 如果不增加sleep 整个master进程就会进入几乎100 cpu的状态
                # 使用sleep的好处就是master的cpu消耗小很多,对于来自系统的给master的信号可以即使反馈
                time.sleep(1)

                sig = self.SIG_QUEUE.pop(0) if len(self.SIG_QUEUE) else None
                if sig is None:
                    self.manage_workers()
                    continue

                if sig not in self.SIG_NAMES:
                    print "unknow signals:%s"%sig
                    continue

                signame = self.SIG_NAMES.get(sig)
                handler = getattr(self, "handle_%s"%signame, None)
                if not handler:
                    print "Unhandler signal: %s"%signame
                    continue

                handler()
        except StopIteration:
            self.halt()
        except KeyboardInterrupt:
            self.halt()
        except SystemExit:
            pass
        except Exception as e:
            print e
            self.stop()
            sys.exit(-1)

    def handle_chld(self, sig, frame):
        '''
        对于子进程退出SIGCHLD信号处理,防止产生大量僵尸进程
        '''
        self.reap_workers()

    def handle_int(self):
        '''
        ctrl+c 关闭master进程,先关闭子进程,然后抛出异常,自己退出
        '''
        self.stop()
        raise StopIteration

    def handle_ttin(self):
        '''
        增加一个子进程
        '''
        print "add a worker"
        self.worker_nums += 1
        self.manage_workers()

    def handle_ttou(self):
        '''
        减少一个子进程
        '''
        print "deincrease a worker"
        if self.worker_nums <= 1:
            return
        self.worker_nums -= 1
        self.manage_workers()

    def stop(self):
        '''
        停止子进程 这里都当做SIGTERM来处理
        '''
        print 'stop workers'
        sig = signal.SIGTERM
        self.kill_workers(sig)
        self.kill_workers(signal.SIGKILL)

    def halt(self, exit_status=0):
        '''
        master 进程自杀
        '''
        print "master exit"
        self.stop()
        sys.exit(exit_status)

    def reap_workers(self):
        '''
        这里的检测也是为了避免僵尸进程,否则大量资源无法释放
        参考:http://www.cnblogs.com/mickole/p/3187770.html
        '''
        try:
            while True:
                #os.waitpid 收集僵尸子进程的信息,并把它彻底销毁后返回
                #这里的 -1 代表所有子进程
                #os.WNOHANG 如果没有子进程信息就立刻返回
                wpid, status = os.waitpid(-1, os.WNOHANG)
                if not wpid:
                    break
                else:
                    exitcode = status >> 8
                    worker = self.WORKERS.pop(wpid, None)
                    if not worker:
                        continue
        except OSError as e:
            #errno.ECHILD 是没有子进程错误
            if e.error != errno.ECHILD:
                raise


    def manage_workers(self):
        '''
        workers 的健康检查,数量是否对齐等
        '''
        if len(self.WORKERS.keys()) < self.worker_nums:
            self.spawn_workers()

        workers = self.WORKERS.items()
        while len(workers) > self.worker_nums:
            (pid, _) = workers.pop(0)
            self.kill_worker(pid, signal.SIGTERM)

    def spawn_worker(self):
        worker = Worker()
        pid = os.fork()

        #master进程处理
        if pid != 0:
            self.WORKERS[pid] = worker
            return pid

        #worker进程处理
        worker_pid = os.getpid()
        try:
            worker.run()
            sys.exit(0)
        except SystemExit:
            raise
        except Exception as e:
            print "work error %s"%str(e)
            sys.exit(-1)

    def spawn_workers(self):
        for i in range(self.worker_nums - len(self.WORKERS.keys())):
            self.spawn_worker()
            #为什么要那么端时间的休眠
            time.sleep(0.1*random.random())

    def kill_workers(self, sig):
        worker_pids = list(self.WORKERS.keys())
        for pid in worker_pids:
            self.kill_worker(pid, sig)

    def kill_worker(self, pid, sig):
        try:
            os.kill(pid, sig)
        except OSError as e:
            print "kill worker error: %s"%str(e)


if __name__ == "__main__":
    Master().run()