""" jupyter-mysql-kernel author:rabin """ from ipykernel.kernelbase import Kernel from pexpect import replwrap, EOF import pexpect from subprocess import check_output from .parser import MysqlParser import os.path import re import signal __version__ = '0.0.1' version_pat = re.compile(r'version (\d+(\.\d+)+)') class MysqlWrapper(replwrap.REPLWrapper): def __init__(self, cmd_or_spawn, orig_prompt, prompt_change, extra_init_cmd=None, line_output_callback=None): self.line_output_callback = line_output_callback replwrap.REPLWrapper.__init__(self, cmd_or_spawn, orig_prompt, prompt_change, extra_init_cmd=extra_init_cmd) def _expect_prompt(self, timeout=-1): if timeout == None: while True: pos = self.child.expect_exact([self.prompt, self.continuation_prompt, u'\r\n'], timeout=None) if pos == 2: self.line_output_callback(self.child.before + '\n') else: if len(self.child.before) != 0: self.line_output_callback(self.child.before) break else: pos = replwrap.REPLWrapper._expect_prompt(self, timeout=timeout) return pos class MysqlKernel(Kernel): implementation = 'jupyter-mysql-kernel' implementation_version = __version__ @property def language_version(self): m = version_pat.search(self.banner) return m.group(1) _banner = None @property def banner(self): if self._banner is None: self._banner = check_output(['mysql', '--version']).decode('utf-8') return self._banner language_info = {'name': 'mysql', 'mimetype': 'text/x-sh', 'file_extension': '.sql'} mysql_setting_file = os.path.join(os.path.expanduser('~'), '.local/config/mysql_config.json') mysql_config = { 'prompt' : u'>' ,'user' : 'root' ,'host' : '192.168.15.10' ,'port' : '3309' ,'charset' : 'utf-8' ,'password' : '123456' } def __init__(self, **kwargs): Kernel.__init__(self, **kwargs) if os.path.exists(self.mysql_setting_file): with open(self.mysql_setting_file,"r") as f: self.mysql_config.update(json.load(f)) self.prompt = self.mysql_config['prompt'] self.start_mysql() def start_mysql(self): sig = signal.signal(signal.SIGINT, signal.SIG_DFL) try: if 'password' in self.mysql_config: child = pexpect.spawn('mysql -A -h {host} -P {port} -u {user} -p{password}'.format(**self.mysql_config), echo=False, encoding=self.mysql_config['charset'], codec_errors='replace') else: child = pexpect.spawn('mysql -A -h {host} -P {port} -u {user}'.format(**self.mysql_config)) prompt_change = None init_cmd = None self.wrapper = MysqlWrapper(child, self.prompt, prompt_change=prompt_change, extra_init_cmd=init_cmd, line_output_callback=self.process_output) finally: signal.signal(signal.SIGINT, sig) def process_output(self, output): if not self.silent and '|' in output: output = output.decode(self.mysql_config['charset']) output = MysqlParser(output) #stream_content = {'name': 'stdout', 'text': output} #self.send_response(self.iopub_socket, 'stream', stream_content) display_content = { 'source': 'kernel', 'data': { 'text/plain': output.text(), 'text/html': output.html() }, 'metadata': {} } self.send_response(self.iopub_socket, 'display_data', display_content) def do_execute(self, code, silent, store_history=True, user_expressions=None, allow_stdin=False): self.silent = silent if not code.strip(): return {'status': 'ok', 'execution_count': self.execution_count, 'payload': [], 'user_expressions': {}} interrupted = False try: self.wrapper.run_command(code.rstrip(), timeout=None) except KeyboardInterrupt: self.wrapper.child.sendintr() interrupted = True self.wrapper._expect_prompt() output = self.wrapper.child.before self.process_output(output) except EOF: output = self.wrapper.child.before + 'Restarting Mysql' self.start_mysql() self.process_output(output) if interrupted: return {'status': 'abort', 'execution_count': self.execution_count} return {'status': 'ok', 'execution_count': self.execution_count, 'payload': [], 'user_expressions': {}}