Commit ee208223 authored by yogesh.m's avatar yogesh.m

Added ProconOs

parent b8989f60
import binascii import binascii
import json import json
from protocol_discover_helpers import modbus_discover,enip_discover,s7_discover,omron_discover,bacnet_discover,codesys_discover,dnp3_discover,fox_discover from protocol_discover_helpers import modbus_discover,enip_discover,s7_discover,omron_discover,bacnet_discover,codesys_discover,dnp3_discover,fox_discover,proconos_discover
class Packet_Analyzer(): class Packet_Analyzer():
def __init__(self): def __init__(self):
self.data=None self.data=None
self.protocol_list={502: modbus_discover,44818:enip_discover,102:s7_discover,9600:omron_discover,47808:bacnet_discover,2455:codesys_discover,20000:dnp3_discover,1911:fox_discover} self.protocol_list={502: modbus_discover,44818:enip_discover,102:s7_discover,9600:omron_discover,47808:bacnet_discover,2455:codesys_discover,20000:dnp3_discover,1911:fox_discover,20547:proconos_discover}
def identify_protocol(self,hex_pkt): def identify_protocol(self,hex_pkt):
protocols="" protocols=""
......
...@@ -4,6 +4,7 @@ from protocol_enumerators import bacnet as bac ...@@ -4,6 +4,7 @@ from protocol_enumerators import bacnet as bac
from protocol_enumerators import modbus from protocol_enumerators import modbus
from protocol_enumerators import dnp3_enum from protocol_enumerators import dnp3_enum
from protocol_enumerators import fox_enum from protocol_enumerators import fox_enum
from protocol_enumerators import proconos_enum
from protocol_enumerators import codesys from protocol_enumerators import codesys
from helpers.port_service_helper import psdata from helpers.port_service_helper import psdata
from protocol_enumerators import omron from protocol_enumerators import omron
...@@ -76,6 +77,12 @@ def analyse_protocol(protocols, pkt): ...@@ -76,6 +77,12 @@ def analyse_protocol(protocols, pkt):
firmware = "Application Version: "+res['Application Version']+"VM Version:"+res['VM Version'] firmware = "Application Version: "+res['Application Version']+"VM Version:"+res['VM Version']
model = res["Application Name"] model = res["Application Name"]
operating_sys=res['OS Name'] operating_sys=res['OS Name']
elif("proconos" in protocols):
res=proconos_enum.get_info(pa.get_ip(inhex) ,int(pa.get_tcp_port(inhex)) if "tcp" in protocols else int(pa.get_udp_port(inhex)))
if(res):
dev_type = res['PLC Type'] if res['PLC Type'] != "" else "ProconOs"
firmware = res['Ladder Logic Runtime']
model = res['Project Name']
else: else:
port_no = str(pa.get_tcp_port(inhex)) if "tcp" in protocols else str( port_no = str(pa.get_tcp_port(inhex)) if "tcp" in protocols else str(
pa.get_udp_port(inhex)) if "udp" in protocols else "Unknown" pa.get_udp_port(inhex)) if "udp" in protocols else "Unknown"
...@@ -149,7 +156,12 @@ def update_protocol(protocols,pkt): ...@@ -149,7 +156,12 @@ def update_protocol(protocols,pkt):
firmware = "Application Version: "+res['Application Version']+"VM Version:"+res['VM Version'] firmware = "Application Version: "+res['Application Version']+"VM Version:"+res['VM Version']
model = res["Application Name"] model = res["Application Name"]
operating_sys=res['OS Name'] operating_sys=res['OS Name']
elif("proconos" in protocols):
res=proconos_enum.get_info(pa.get_ip(inhex) ,int(pa.get_tcp_port(inhex)) if "tcp" in protocols else int(pa.get_udp_port(inhex)))
if(res):
dev_type = res['PLC Type'] if res['PLC Type'] else "ProconOs"
firmware = res['Ladder Logic Runtime']
model = res['Project Name']
return dev_type,vendor,firmware,model,operating_sys return dev_type,vendor,firmware,model,operating_sys
except Exception as e: except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info() exc_type, exc_obj, exc_tb = sys.exc_info()
......
def protocol_identify(hex_pkt):
if (b'cc01' in hex_pkt and hex_pkt[108:112] == b'cc01'):
return ":proconos"
\ No newline at end of file
import socket
import struct
import json
def send_request(host, port):
# Create the initial request packet
req_info = bytes.fromhex("cc01000b4002000047ee")
# Create a socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# Connect to the host and port
sock.connect((host, port))
# Send the request
sock.sendall(req_info)
# Receive the response
response = sock.recv(1024) # Adjust the buffer size as needed
# Check if the response is valid
if response and response[0] == 0xcc:
return response
except Exception as e:
print(f"Error: {e}")
finally:
sock.close()
return None
def parse_response(response):
output = {}
# Extract data from the response using struct.unpack
runtime = response[12:43]
plc_type = response[43:72]
project_name = response[76:88]
boot_project = response[88:100]
project_source_code = response[100:106]
output["Ladder Logic Runtime"] = runtime.decode('utf-8').strip("\x00")
output["PLC Type"] = plc_type.decode('utf-8').strip("\x00")
output["Project Name"] = project_name.decode('utf-8').strip("\x00")
output["Boot Project"] = boot_project.decode('utf-8').strip("\x00")
output["Project Source Code"] = project_source_code.decode('utf-8').strip("\x00")
return output
def action(host, port):
response = send_request(host, port)
if response:
result = parse_response(response)
return result
def get_info(ip,port):
return(action(ip,port))
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment