123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- Modbus TestKit: Implementation of Modbus protocol in python
- (C)2009 - Luc Jean - luc.jean@gmail.com
- (C)2009 - Apidev - http://www.apidev.fr
- This is distributed under GNU LGPL license, see license.txt
- The modbus_tk simulator is a console application which is running a server with TCP and RTU communication
- It is possible to interact with the server from the command line or from a RPC (Remote Process Call)
- """
- from __future__ import print_function
- import ctypes
- import os
- import sys
- import select
- import serial
- import threading
- import time
- import modbus_tk
- from modbus_tk import hooks
- from modbus_tk import modbus
- from modbus_tk import modbus_tcp
- from modbus_tk import modbus_rtu
- if modbus_tk.utils.PY2:
- import Queue as queue
- import SocketServer
- else:
- import queue
- import socketserver as SocketServer
- # add logging capability
- LOGGER = modbus_tk.utils.create_logger(name="console", record_format="%(message)s")
- # The communication between the server and the user interfaces (console or rpc) are done through queues
- # command received from the interfaces
- INPUT_QUEUE = queue.Queue()
- # response to be sent back by the interfaces
- OUTPUT_QUEUE = queue.Queue()
- class CompositeServer(modbus.Server):
- """make possible to have several servers sharing the same databank"""
- def __init__(self, list_of_server_classes, list_of_server_args, databank=None):
- """Constructor"""
- super(CompositeServer, self).__init__(databank)
- self._servers = [
- the_class(*the_args, **{"databank": self.get_db()})
- for the_class, the_args in zip(list_of_server_classes, list_of_server_args)
- if issubclass(the_class, modbus.Server)
- ]
- def set_verbose(self, verbose):
- """if verbose is true the sent and received packets will be logged"""
- for srv in self._servers:
- srv.set_verbose(verbose)
- def _make_thread(self):
- """should initialize the main thread of the server. You don't need it here"""
- pass
- def _make_query(self):
- """Returns an instance of a Query subclass implementing the MAC layer protocol"""
- raise NotImplementedError()
- def start(self):
- """Start the server. It will handle request"""
- for srv in self._servers:
- srv.start()
- def stop(self):
- """stop the server. It doesn't handle request anymore"""
- for srv in self._servers:
- srv.stop()
- class RpcHandler(SocketServer.BaseRequestHandler):
- """An instance of this class is created every time an RPC call is received by the server"""
- def handle(self):
- """This function is called automatically by the SocketServer"""
- # self.request is the TCP socket connected to the client
- # read the incoming command
- request = self.request.recv(1024).strip()
- # write to the queue waiting to be processed by the server
- INPUT_QUEUE.put(request)
- # wait for the server answer in the output queue
- response = OUTPUT_QUEUE.get(timeout=5.0)
- # send back the answer
- self.request.send(response)
- class RpcInterface(threading.Thread):
- """Manage RPC call over TCP/IP thanks to the SocketServer module"""
- def __init__(self):
- """Constructor"""
- super(RpcInterface, self).__init__()
- self.rpc_server = SocketServer.TCPServer(("", 2711), RpcHandler)
- def run(self):
- """run the server and wait that it returns"""
- self.rpc_server.serve_forever(0.5)
- def close(self):
- """force the socket server to exit"""
- try:
- self.rpc_server.shutdown()
- self.join(1.0)
- except Exception:
- LOGGER.warning("An error occurred while closing RPC interface")
- class ConsoleInterface(threading.Thread):
- """Manage user actions from the console"""
- def __init__(self):
- """constructor: initialize communication with the console"""
- super(ConsoleInterface, self).__init__()
- self.inq = INPUT_QUEUE
- self.outq = OUTPUT_QUEUE
- if os.name == "nt":
- ctypes.windll.Kernel32.GetStdHandle.restype = ctypes.c_ulong
- self.console_handle = ctypes.windll.Kernel32.GetStdHandle(ctypes.c_ulong(0xfffffff5))
- ctypes.windll.Kernel32.WaitForSingleObject.restype = ctypes.c_ulong
- elif os.name == "posix":
- # select already imported
- pass
- else:
- raise Exception("%s platform is not supported yet" % os.name)
- self._go = threading.Event()
- self._go.set()
- def _check_console_input(self):
- """test if there is something to read on the console"""
- if os.name == "nt":
- if 0 == ctypes.windll.Kernel32.WaitForSingleObject(self.console_handle, 500):
- return True
- elif os.name == "posix":
- (inputready, abcd, efgh) = select.select([sys.stdin], [], [], 0.5)
- if len(inputready) > 0:
- return True
- else:
- raise Exception("%s platform is not supported yet" % os.name)
- return False
- def run(self):
- """read from the console, transfer to the server and write the answer"""
- while self._go.isSet(): #while app is running
- if self._check_console_input(): #if something to read on the console
- cmd = sys.stdin.readline() #read it
- self.inq.put(cmd) #dispatch it tpo the server
- response = self.outq.get(timeout=2.0) #wait for an answer
- sys.stdout.write(response) #write the answer on the console
- def close(self):
- """terminates the thread"""
- self._go.clear()
- self.join(1.0)
- class Simulator(object):
- """The main class of the app in charge of running everything"""
- def __init__(self, server=None):
- """Constructor"""
- if server is None:
- self.server = CompositeServer([modbus_rtu.RtuServer, modbus_tcp.TcpServer], [(serial.Serial(0),), ()])
- else:
- self.server = server
- self.rpc = RpcInterface()
- self.console = ConsoleInterface()
- self.inq, self.outq = INPUT_QUEUE, OUTPUT_QUEUE
- self._hooks_fct = {}
- self.cmds = {
- "add_slave": self._do_add_slave,
- "has_slave": self._do_has_slave,
- "remove_slave": self._do_remove_slave,
- "remove_all_slaves": self._do_remove_all_slaves,
- "add_block": self._do_add_block,
- "remove_block": self._do_remove_block,
- "remove_all_blocks": self._do_remove_all_blocks,
- "set_values": self._do_set_values,
- "get_values": self._do_get_values,
- "install_hook": self._do_install_hook,
- "uninstall_hook": self._do_uninstall_hook,
- "set_verbose": self._do_set_verbose,
- }
- def add_command(self, name, fct):
- """add a custom command"""
- self.cmds[name] = fct
- def start(self):
- """run the servers"""
- self.server.start()
- self.console.start()
- self.rpc.start()
- LOGGER.info("modbus_tk.simulator is running...")
- self._handle()
- def declare_hook(self, fct_name, fct):
- """declare a hook function by its name. It must be installed by an install hook command"""
- self._hooks_fct[fct_name] = fct
- def _tuple_to_str(self, the_tuple):
- """convert a tuple to a string"""
- ret = ""
- for item in the_tuple:
- ret += (" " + str(item))
- return ret[1:]
- def _do_add_slave(self, args):
- """execute the add_slave command"""
- slave_id = int(args[1])
- self.server.add_slave(slave_id)
- return "{0}".format(slave_id)
- def _do_has_slave(self, args):
- """execute the has_slave command"""
- slave_id = int(args[1])
- try:
- self.server.get_slave(slave_id)
- except Exception:
- return "0"
- return "1"
- def _do_remove_slave(self, args):
- """execute the remove_slave command"""
- slave_id = int(args[1])
- self.server.remove_slave(slave_id)
- return ""
- def _do_remove_all_slaves(self, args):
- """execute the remove_slave command"""
- self.server.remove_all_slaves()
- return ""
- def _do_add_block(self, args):
- """execute the add_block command"""
- slave_id = int(args[1])
- name = args[2]
- block_type = int(args[3])
- starting_address = int(args[4])
- length = int(args[5])
- slave = self.server.get_slave(slave_id)
- slave.add_block(name, block_type, starting_address, length)
- return name
- def _do_remove_block(self, args):
- """execute the remove_block command"""
- slave_id = int(args[1])
- name = args[2]
- slave = self.server.get_slave(slave_id)
- slave.remove_block(name)
- def _do_remove_all_blocks(self, args):
- """execute the remove_all_blocks command"""
- slave_id = int(args[1])
- slave = self.server.get_slave(slave_id)
- slave.remove_all_blocks()
- def _do_set_values(self, args):
- """execute the set_values command"""
- slave_id = int(args[1])
- name = args[2]
- address = int(args[3])
- values = []
- for val in args[4:]:
- values.append(int(val))
- slave = self.server.get_slave(slave_id)
- slave.set_values(name, address, values)
- values = slave.get_values(name, address, len(values))
- return self._tuple_to_str(values)
- def _do_get_values(self, args):
- """execute the get_values command"""
- slave_id = int(args[1])
- name = args[2]
- address = int(args[3])
- length = int(args[4])
- slave = self.server.get_slave(slave_id)
- values = slave.get_values(name, address, length)
- return self._tuple_to_str(values)
- def _do_install_hook(self, args):
- """install a function as a hook"""
- hook_name = args[1]
- fct_name = args[2]
- hooks.install_hook(hook_name, self._hooks_fct[fct_name])
- def _do_uninstall_hook(self, args):
- """
- uninstall a function as a hook.
- If no function is given, uninstall all functions
- """
- hook_name = args[1]
- try:
- hooks.uninstall_hook(hook_name)
- except KeyError as exception:
- LOGGER.error(str(exception))
- def _do_set_verbose(self, args):
- """change the verbosity of the server"""
- verbose = int(args[1])
- self.server.set_verbose(verbose)
- return "%d" % verbose
- def _handle(self):
- """almost-for-ever loop in charge of listening for command and executing it"""
- while True:
- cmd = self.inq.get()
- args = cmd.strip('\r\n').split(' ')
- if cmd.find('quit') == 0:
- self.outq.put('bye-bye\r\n')
- break
- elif args[0] in self.cmds:
- try:
- answer = self.cmds[args[0]](args)
- self.outq.put("%s done: %s\r\n" % (args[0], answer))
- except Exception as msg:
- self.outq.put("%s error: %s\r\n" % (args[0], msg))
- else:
- self.outq.put("error: unknown command %s\r\n" % (args[0]))
- def close(self):
- """close every server"""
- self.console.close()
- self.rpc.close()
- self.server.stop()
- def print_me(args):
- """hook function example"""
- request = args[1]
- print("print_me: len = ", len(request))
- def run_simulator():
- """run simulator"""
- simulator = Simulator()
- try:
- LOGGER.info("'quit' for closing the server")
- simulator.declare_hook("print_me", print_me)
- simulator.start()
- except Exception as exception:
- print(exception)
- finally:
- simulator.close()
- LOGGER.info("modbus_tk.simulator has stopped!")
- # In python 2.5, the SocketServer shutdown is not working Ok
- # The 2 lines below are an ugly temporary workaround
- time.sleep(1.0)
- sys.exit()
- if __name__ == "__main__":
- run_simulator()
|