Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
A
Asset_Discovery
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
CI / CD Analytics
Repository Analytics
Value Stream Analytics
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
yogesh.m
Asset_Discovery
Commits
11fee771
Commit
11fee771
authored
Sep 08, 2023
by
yogesh.m
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
added dnp3
parent
32d7bbfd
Changes
5
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
112 additions
and
3 deletions
+112
-3
config.yaml
config.yaml
+1
-1
helpers/Packet_Analyzer.py
helpers/Packet_Analyzer.py
+2
-2
protocol_actions.py
protocol_actions.py
+9
-0
protocol_discover_helpers/dnp3_discover.py
protocol_discover_helpers/dnp3_discover.py
+3
-0
protocol_enumerators/dnp3_enum.py
protocol_enumerators/dnp3_enum.py
+97
-0
No files found.
config.yaml
View file @
11fee771
...
...
@@ -4,7 +4,7 @@ configuration:
unifytwin_server_ip_address
:
https://webhook.site/43393e36-6b04-4481-8485-fad2c7cd549f
#Data will be sent back here
edge_device_location
:
Dalmia Cement
null_loopback
:
False
interfaces
:
Wi-Fi
interfaces
:
eth0
blacklist_ip
:
[
'
46.4.105.116'
,
'
172.67.214.157'
,
'
3.6.115.64'
,
'
104.21.53.154'
]
blacklist_dns
:
[
'
webhook.site.'
]
ui_host
:
0.0.0.0
...
...
helpers/Packet_Analyzer.py
View file @
11fee771
import
binascii
import
json
from
protocol_discover_helpers
import
modbus_discover
,
enip_discover
,
s7_discover
,
omron_discover
,
bacnet_discover
,
codesys_discover
from
protocol_discover_helpers
import
modbus_discover
,
enip_discover
,
s7_discover
,
omron_discover
,
bacnet_discover
,
codesys_discover
,
dnp3_discover
class
Packet_Analyzer
():
def
__init__
(
self
):
self
.
data
=
None
self
.
protocol_list
=
{
502
:
modbus_discover
,
44818
:
enip_discover
,
102
:
s7_discover
,
9600
:
omron_discover
,
47808
:
bacnet_discover
,
2455
:
codesys_discover
}
self
.
protocol_list
=
{
502
:
modbus_discover
,
44818
:
enip_discover
,
102
:
s7_discover
,
9600
:
omron_discover
,
47808
:
bacnet_discover
,
2455
:
codesys_discover
,
20000
:
dnp3_discover
}
def
identify_protocol
(
self
,
hex_pkt
):
protocols
=
""
...
...
protocol_actions.py
View file @
11fee771
...
...
@@ -2,6 +2,7 @@ from protocol_enumerators import ethernetip_enum as eip
from
protocol_enumerators
import
s7_enum
as
s7
from
protocol_enumerators
import
bacnet
as
bac
from
protocol_enumerators
import
modbus
from
protocol_enumerators
import
dnp3_enum
from
protocol_enumerators
import
codesys
from
helpers.port_service_helper
import
psdata
from
protocol_enumerators
import
omron
...
...
@@ -61,6 +62,10 @@ def analyse_protocol(protocols, pkt):
vendor
=
'Omron Devices'
firmware
=
res
[
'Controller Version'
]
model
=
res
[
'Controller Model'
]
elif
(
"dnp3"
in
protocols
):
res
=
dnp3_enum
.
get_info
(
pa
.
get_ip
(
inhex
)
,
int
(
pa
.
get_tcp_port
(
inhex
))
if
"tcp"
in
protocols
else
int
(
pa
.
get_udp_port
(
inhex
)))
if
(
res
):
dev_type
=
res
[
'Device Type'
]
else
:
port_no
=
str
(
pa
.
get_tcp_port
(
inhex
))
if
"tcp"
in
protocols
else
str
(
pa
.
get_udp_port
(
inhex
))
if
"udp"
in
protocols
else
"Unknown"
...
...
@@ -121,6 +126,10 @@ def update_protocol(protocols,pkt):
vendor
=
'Omron Devices'
firmware
=
res
[
'Controller Version'
]
model
=
res
[
'Controller Model'
]
elif
(
"dnp3"
in
protocols
):
res
=
dnp3_enum
.
get_info
(
pa
.
get_ip
(
inhex
)
,
int
(
pa
.
get_tcp_port
(
inhex
))
if
"tcp"
in
protocols
else
int
(
pa
.
get_udp_port
(
inhex
)))
if
(
res
):
dev_type
=
res
[
'Device Type'
]
return
dev_type
,
vendor
,
firmware
,
model
except
Exception
as
e
:
exc_type
,
exc_obj
,
exc_tb
=
sys
.
exc_info
()
...
...
protocol_discover_helpers/dnp3_discover.py
0 → 100644
View file @
11fee771
def
protocol_identify
(
hex_pkt
):
if
(
b
'0564'
in
hex_pkt
and
hex_pkt
[
108
:
112
]
==
b
'0564'
):
return
":dnp3"
\ No newline at end of file
protocol_enumerators/dnp3_enum.py
0 → 100644
View file @
11fee771
import
socket
import
binascii
# Datalink Function Codes PRM=0
function_id
=
{
0
:
"ACK"
,
1
:
"NACK"
,
11
:
"Link Status"
,
15
:
"User Data"
}
# Data Link Function Codes PRM=1
alt_function_id
=
{
0
:
"RESET Link"
,
1
:
"Reset User Process"
,
2
:
"TEST link"
,
3
:
"User Data"
,
4
:
"User Data"
,
9
:
"Request Link Status"
}
def
funct_lookup
(
id
):
if
len
(
str
(
id
))
-
2
<
5
:
funct_id
=
function_id
.
get
(
int
(
id
,
2
),
"Unknown Function ID"
)
id
=
int
(
id
,
2
)
else
:
first_value
=
ord
(
id
[
1
])
%
0x10
second_value
=
int
(
id
[
4
:
8
],
2
)
if
first_value
==
0
:
funct_id
=
function_id
.
get
(
second_value
,
"Unknown Function ID"
)
else
:
funct_id
=
alt_function_id
.
get
(
second_value
,
"Unknown Function ID"
)
id
=
second_value
return
f
"{funct_id} ({id})"
def
action
(
host
,
port
):
# Create a new socket
sock
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
sock
.
settimeout
(
1
)
# Set timeout to 1 second
try
:
# Connect to the remote host
sock
.
connect
((
str
(
host
),
port
))
# Query to pull the first 100 addresses
first100
=
binascii
.
unhexlify
(
"056405C900000000364C056405C901000000DE8E056405C9020000009F84056405C9030000007746056405C9040000001D90056405C905000000F552056405C906000000B4580"
+
"56405C9070000005C9A056405C90800000019B9056405C909000000F17B056405C90A000000B071056405C90B000000058B3056405C90C0000003265056405C90D000000DAA705"
+
"56405C90E0000009BAD056405C90F000000736F056405C91000000011EB056405C911000000F929056405C912000000B823056405C91300000050E1056405C9140000003A37056"
+
"405C915000000D2F5056405C91600000093FF056405C9170000007B3D056405C9180000003E1E056405C919000000D6DC056405C91A00000097D6056405C91B0000007F140564"
+
"05C91C00000015C2056405C91D000000FD00056405C91E000000BC0A056405C91F00000054C8056405C920000000014F056405C921000000E98D056405C922000000A887056405"
+
"C9230000004045056405C9240000002A93056405C925000000C251056405C926000000835B056405C9270000006B99"
)
# Send the query for the first 100 addresses
sock
.
send
(
first100
)
# Receive the response for parsing
response
=
sock
.
recv
(
1024
)
# Adjust the buffer size as needed
# If the response was timeout, return that we had a timeout
if
not
response
:
sock
.
close
()
return
False
# Check if the response starts with 0x0564
if
response
.
startswith
(
b
'
\x05\x64
'
):
# Close the socket
sock
.
close
()
# Unpack the bit string for PRM checking as well as function codes
ctrl
=
response
[
3
]
dstadd
=
int
.
from_bytes
(
response
[
5
:
6
],
byteorder
=
'big'
)
srceadd
=
int
.
from_bytes
(
response
[
6
:
7
],
byteorder
=
'big'
)
# Set up an output dictionary with values
output
=
{
"Source Address"
:
srceadd
,
"Destination Address"
:
dstadd
,
"Device Type"
:
"DNP3"
,
"Control"
:
funct_lookup
(
bin
(
ctrl
)),
}
# Return the output
return
output
else
:
sock
.
close
()
return
False
except
socket
.
error
as
e
:
return
False
finally
:
sock
.
close
()
def
get_info
(
ip
,
port
):
return
(
action
(
ip
,
port
))
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment