Commit 09a2ad8d authored by yogesh.m's avatar yogesh.m

Merge branch 'main' into 'main'

Main

See merge request !1
parents 179a2e62 f42bb7b2
*.pyc
*.cypython-311.pyc
assets.json
migrations
\ No newline at end of file
This diff is collapsed.
No preview for this file type
import binascii import binascii
import json import json
from protocol_discover_helpers import modbus_discover,enip_discover,s7_discover,omron_discover,bacnet_discover from protocol_discover_helpers import modbus_discover,enip_discover,s7_discover,omron_discover,bacnet_discover,codesys_discover
class Packet_Analyzer(): class Packet_Analyzer():
def __init__(self): def __init__(self):
self.data=None self.data=None
self.protocol_list={502: modbus_discover,44818:enip_discover,102:s7_discover,9600:omron_discover,47808:bacnet_discover} self.protocol_list={502: modbus_discover,44818:enip_discover,102:s7_discover,9600:omron_discover,47808:bacnet_discover,2455:codesys_discover}
def identify_protocol(self,hex_pkt): def identify_protocol(self,hex_pkt):
protocols="" protocols=""
......
...@@ -2,6 +2,7 @@ from protocol_enumerators import ethernetip_enum as eip ...@@ -2,6 +2,7 @@ from protocol_enumerators import ethernetip_enum as eip
from protocol_enumerators import s7_enum as s7 from protocol_enumerators import s7_enum as s7
from protocol_enumerators import bacnet as bac from protocol_enumerators import bacnet as bac
from protocol_enumerators import modbus from protocol_enumerators import modbus
from protocol_enumerators import codesys
from helpers.port_service_helper import psdata from helpers.port_service_helper import psdata
from protocol_enumerators import omron from protocol_enumerators import omron
import binascii import binascii
...@@ -42,6 +43,12 @@ def analyse_protocol(protocols,pkt): ...@@ -42,6 +43,12 @@ def analyse_protocol(protocols,pkt):
vendor = res['vendorid'] vendor = res['vendorid']
firmware = res['firmware'] firmware = res['firmware']
model = res['model'] model = res['model']
elif ("codesys" in protocols):
res = codesys.get_info(eip.get_info(pa.get_ip(inhex), int(pa.get_tcp_port(inhex)) if "tcp" in protocols else int(
pa.get_udp_port(inhex))))
if (res):
dev_type = res['OS Name']
vendor = res['Product Type']
elif ("modbus" in protocols): elif ("modbus" in protocols):
res = modbus.get_info(eip.get_info(pa.get_ip(inhex), int(pa.get_tcp_port(inhex)) if "tcp" in protocols else int( res = modbus.get_info(eip.get_info(pa.get_ip(inhex), int(pa.get_tcp_port(inhex)) if "tcp" in protocols else int(
pa.get_udp_port(inhex))), False) pa.get_udp_port(inhex))), False)
...@@ -99,6 +106,11 @@ def update_protocol(protocols,pkt): ...@@ -99,6 +106,11 @@ def update_protocol(protocols,pkt):
vendor=res['vendorid'] vendor=res['vendorid']
firmware=res['firmware'] firmware=res['firmware']
model=res['model'] model=res['model']
elif("codesys" in protocols):
res=codesys.get_info(pa.get_ip(inhex) ,int(pa.get_tcp_port(inhex)) if "tcp" in protocols else int(pa.get_udp_port(inhex)))
if(res):
dev_type=res['OS Name']
vendor=res['Product Type']
elif("modbus" in protocols): elif("modbus" in protocols):
res=modbus.get_info(pa.get_ip(inhex) ,int(pa.get_tcp_port(inhex)) if "tcp" in protocols else int(pa.get_udp_port(inhex)),False) res=modbus.get_info(pa.get_ip(inhex) ,int(pa.get_tcp_port(inhex)) if "tcp" in protocols else int(pa.get_udp_port(inhex)),False)
if(res): if(res):
......
def protocol_identify(hex_pkt):
if (b'bbbb' in hex_pkt and hex_pkt[108:112] == b'bbbb'):
return ":codesys"
\ No newline at end of file
import binascii
from scapy.all import *
import socket import socket
import binascii
def action(host, port):
# CoDeSyS little endian query
lile_query = binascii.unhexlify("bbbb0100000001")
# CoDeSyS big endian query
bige_query = binascii.unhexlify("bbbb0100000101")
# Create a socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# Connect to the remote host
sock.connect((host, port))
# Send the little endian query
sock.send(lile_query)
# Receive the response
response = sock.recv(1024)
# If there was no response, try the big endian query
if not response:
sock.send(bige_query)
response = sock.recv(1024)
# Check if the response starts with 0xbb
if response and response[0] == 0xbb:
# Extract the null-terminated strings (OS Name, OS Type, Product Type)
os_name_end = response.find(b'\x00', 64)
os_name = response[64:os_name_end].decode()
os_type_end = response.find(b'\x00', 96)
os_type = response[96:os_type_end].decode()
product_type_end = response.find(b'\x00', 128)
product_type = response[128:product_type_end].decode()
# Close the socket
sock.close()
output = {
"OS Name": os_name,
"Product Type": product_type,
"Device IP":host,
"Port":port
}
return output
except Exception as e:
print(f"Error: {e}")
return None
def action(host,port): def get_info(ip,port):
output={} return(action(ip,port))
cotp=binascii.unhexlify('0300001611e00000001400c1020100c2020102c0010a') \ No newline at end of file
alt_COTP = binascii.unhexlify("0300001611e00000000500c1020100c2020200c0010a")
ROSCTR_Setup = binascii.unhexlify("0300001902f08032010000000000080000f0000001000101e0")
Read_SZL = binascii.unhexlify("0300002102f080320700000000000800080001120411440100ff09000400110001")
first_SZL_Request = binascii.unhexlify("0300002102f080320700000000000800080001120411440100ff09000400110001")
second_SZL_Request = binascii.unhexlify("0300002102f080320700000000000800080001120411440100ff090004001c0001")
response=None
pkt = Ether(cotp)
MESSAGE = pkt
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.settimeout(3)
s.connect((host,port))
except:
return False
response=send_receive(s,cotp)
if(response):
if(hex(response[5])!="0xd0"):
s.close()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host,port))
response=send_receive(s,alt_COTP)
if(response):
if(hex(response[5])!="0xd0"):
return False
response = send_receive(s,ROSCTR_Setup)
if(response):
if(hex(response[7])!="0x32"):
return False
response = send_receive(s,Read_SZL)
if(response):
if(hex(response[7])!="0x32"):
return False
response = send_receive(s, first_SZL_Request)
try:
output = first_parse_response(response,output)
except:
return False
response = send_receive(s, second_SZL_Request)
output=second_parse_response(response,output)
output["DeviceIP"]=host
output["Port"]=port
return output
\ No newline at end of file
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment