def div(a,b):
return a/b
def algo(a,b):
return div(1,b) + 1
try:
algo(1,0)
except:
import traceback, StringIO, re, os
def colorize(text, color):
control_sequence_introducer = "\x1B["
return "{0}{1}m{2}{0}0m".format(control_sequence_introducer, color, text)
exception_output = StringIO.StringIO()
traceback.print_exc(file=exception_output)
stack_trace = exception_output.getvalue().splitlines()
print stack_trace
stack_trace.pop(0)
error = stack_trace.pop()
print ''
i = 0
for line in stack_trace:
line = line.strip()
if 0 == i % 2:
matches = re.match('File "(.*)", line (\d+), in (.+)', line)
if matches:
groups = matches.groups()
path = groups[0]
path = path.replace(os.getcwd(), '')
line_number = groups[1]
method = groups[2]
else:
ident =' '
print ident * i + "{0} ({1}): {2}: {3}".format(
colorize(path, 36),
colorize(line_number, 33),
colorize(method, 32),
colorize(line, 31)
)
i = i + 1
print colorize("Error ", 33) + colorize(error, 31)
print ''
colorize
def colorize(text, color):
control_sequence_introducer = "\x1B["
return "{0}{1}m{2}{0}0m".format(control_sequence_introducer, color, text)
print repr(colorize('fuck', 31))
print colorize('fuck', 32)
print colorize('fuck', 33)
print colorize('fuck', 34)
print colorize('fuck', 35)
print colorize('fuck', 36)
print colorize('fuck', 37)
print colorize('fuck', 38)
def print_format_table():
"""
prints table of formatted text format options
"""
for style in range(8):
for fg in range(30,38):
s1 = ''
for bg in range(40,48):
format = ';'.join([str(style), str(fg), str(bg)])
s1 += '\x1b[%sm %s \x1b[0m' % (format, format)
print(s1)
print('\n')
print_format_table()
thread exception dump stack
import thread
import threading
import sys
import traceback
import logging
from tornado.ioloop import IOLoop
def is_main_thread():
cur_tid = thread.get_ident()
main_tid = None
main_inst = IOLoop.instance()
if main_inst is not None:
main_tid = main_inst._thread_ident
if main_tid is None:
main_tid = cur_tid
if cur_tid != main_tid:
return False
else:
return True
# class safe_acquire_lock(object):
# def __init__(self, lock):
# self.lock = lock
# def __enter__(self):
# self.lock.acquire()
# def __exit__(self, type, value, traceback):
# self.lock.release()
def get_thread_id():
th = threading.current_thread()
return '%d-%s' % (th.ident, th.name)
def dump_thread_stacks():
frames = sys._current_frames()
for t in threading.enumerate():
if t.ident is not None:
f = frames.get(t.ident)
stack = traceback.extract_stack(f)
logging.info('%s(%s-%d): %s', t, t.__class__.__name__, t.ident, stack)
def get_object_in_stack(test_func):
frame = sys._getframe()
while frame.f_back is not None:
for v in frame.f_locals.values():
if test_func(v):
return v
frame = frame.f_back
return None
def get_object_top_call_in_stack(obj, excludes=[]):
calls = []
for f in dir(obj):
a = getattr(obj, f)
if callable(a) and \
(not isinstance(excludes, list) or f not in excludes):
calls.append(f)
frame = sys._getframe().f_back
top_call = None
while frame is not None:
if obj in frame.f_locals.values() and \
frame.f_code.co_name in calls:
top_call = frame.f_code.co_name
frame = frame.f_back
return top_call
def get_object_of_classes_in_stack(cls_list, excludes=None):
def _is_instance(v):
for cls in cls_list:
if isinstance(v, cls) and \
(excludes is None or v not in excludes):
return True
return False
return get_object_in_stack(_is_instance)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
class Test(object):
def call(self):
dump_thread_stacks()
# print get_object_of_classes_in_stack((Test,))
# print get_object_top_call_in_stack(self)
Test().call()
class worker_thread(object):
def __init__(self, func):
self.__func = func
def __call__(self, *args, **kwargs):
if self.__func:
if is_main_thread():
thread.start_new(lambda: self.__do_func(*args, **kwargs), ())
else:
self.__do_func(*args, **kwargs)
def __do_func(self, *args, **kwargs):
try:
self.__func(*args, **kwargs)
except Exception as e:
logging.error(e)
def assert_main_thread_invoke(name='invoke'):
if is_main_thread():
import traceback
stack = traceback.format_stack()
s_stack = ''.join(stack)
logging.warning('%s on main thread: %s' % (name, s_stack))