Bulk Modbus TCP Device Simulation for Gateway Testing
When conducting stress tests on edge gateways, manually configuring Modbus simulation tools becomes cumbersome—especially when register values need to increment automatically. A Python-based solutino provides better control and automation for generating multiple virtual Modbus devices.
The following implementation creates 100 Modbus TCP servers, each running on sequential ports with auto-incrementing holding registers:
from pymodbus.server.sync import StartTcpServer, ModbusTcpServer
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusSlaveContext, ModbusServerContext
from pymodbus.transaction import ModbusSocketFramer
import logging
import threading
import socket
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('pymodbus')
logger.setLevel(logging.DEBUG)
def build_incrementing_registers(initial_value, count):
"""Generate a list of incrementing register values."""
logger.debug(f"Generating {count} registers starting from {initial_value}")
return [initial_value + idx for idx in range(count)]
# Initialize data store with different register types
data_store = ModbusSlaveContext(
di=ModbusSequentialDataBlock(0, [0] * 10), # Discrete inputs
co=ModbusSequentialDataBlock(0, [0] * 10), # Coils
hr=build_incrementing_registers(0, 100), # Holding registers
ir=ModbusSequentialDataBlock(0, [0] * 10) # Input registers
)
server_context = ModbusServerContext(slaves=data_store, single=True)
# Configure device identification
device_info = ModbusDeviceIdentification()
device_info.VendorName = 'pymodbus'
device_info.ProductCode = 'PM'
device_info.VendorUrl = 'http://github.com/riptideio/pymodbus/'
device_info.ProductName = 'pymodbus Server'
device_info.ModelName = 'pymodbus Server'
device_info.MajorMinorRevision = '1.0'
class ModbusServerWrapper(ModbusTcpServer):
"""Custom server wrapper with connection logging."""
def handle(self):
try:
logger.debug(f"Incoming connection from [{self.client_address}]")
super().handle()
except Exception as err:
logger.error(f"Client handling error {self.client_address}: {err}")
finally:
logger.debug(f"Connection closed [{self.client_address}]")
def start_modbus_instance(listen_port):
"""Launch a single Modbus TCP server instance."""
logger.debug(f"Initializing server on port {listen_port}...")
try:
server = ModbusServerWrapper(
server_context,
identity=device_info,
address=("10.245.3.9", listen_port),
framer=ModbusSocketFramer
)
server.serve_forever()
except Exception as e:
logger.error(f"Failed to start server on port {listen_port}: {e}")
logger.debug(f"Server active on port {listen_port}")
# Launch servers across multiple ports in separate threads
for port in range(502, 602):
worker = threading.Thread(target=start_modbus_instance, args=(port,))
worker.start()
print("Modbus server cluster initialization complete.")
# Verify port binding status
for port in range(502, 602):
check_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
status = check_socket.connect_ex(("10.245.3.9", port))
if status == 0:
print(f"Port {port} - listening")
else:
print(f"Port {port} - unavailable")
check_socket.close()
Configuration Notes:
- Replace
10.245.3.9with the local machine's IP address - The server cluster spans ports 502 through 601 (100 total instances)
- Each simulated device exposes 100 holding registers with values that increment automatically from 0
- Discrete inputs, coils, and input registers are initailized to zero as placeholders