Nexpose Vulnerability Scan by Using Rest-API
Vulnerability scan tools you identify vulnerabilities in your infrastructure. You can perform manual and automated scans. But, in some cases, you may need to integrate your scannner with another tool such as devops pipeline (jenkins) or VM build pipeline.
You cannot use automated scans in this type of integgrations. For example, lets say that you have build pipeline to generate a new virtual machine (VM). You can create number of macihnes in a day by usinng this pipeline. After build process, you need check that the asset doesn’t have any high and critical vulnerabilities. So your deployment process should be blocked if there is any high or critical vulnerability on assets.
In this post, I want to give you a working example that shows how to initiate a scan using nexpose api. I will create a python script for this. Rapid7 have a nice documentation about nexpose api. For any kind of automation with nexpose, tihs is the best resource.
Prepare Nexpose Infrastructure
Before to start script, we need to make some changes on Nexpose. I am not gonna explain details here. But I I did the below changes.
- Create a site in nexpose
- Set default scan template for the site
- Create an account for API. The account should be able to initiate scan on the site.
In my nexpose configuration each site have has been created for different purposes. I think I should create another site for the API scan. But you can use existing sites in your infrastructure.
Manunal Test
Now we can start coding. My code have two basic steps
- Add the asset you want to scan into the site
- Initiate the nexpose scan on the site
1: Add assets you wannt to scan
This step is important. Before initiate a scan, we need to update the asset list on the site. Because, nexpose will use the asset list of the site. To add asset list into the site I use the below API call.
curl --request PUT \
--url https://NexposeServer/api/3/sites/<site-id>/included_targets \
--header 'Authorization: <auth-key>' \
--header 'Content-Type: application/json' \
--data '[
"10.99.55.129",
"172.16.13.45"
]'
3: Start the scan
The API call to initiate is simple.
curl --request POST \
--url https://nexpose:3780/api/3/sites/<site-id>/scans \
--header 'Authorization: <auth-key>'
4: Check if scan completed
When you initiate a scan successfully, the server will provide the scan id number as a result. So you can use this number to check if scan is completed.
curl --request GET \
--url https://NexposeServer/api/3/scans/73341 \
--header 'Authorization: <auth-key>'
5: Get Asset Vulnerabilities
After the scan is completed, you can pull the vulnerability inforation from nexpose. it is a simple process. But there is two steps. We need to get asset id first, then we can get the findings by using asset id in the request. You will need to use pagination to get all findings about the asset. But I did not use pagination in my code here.
Here is the query to get asset id:
curl --request POST \
--url https://NexposeServer/api/3/assets/search \
--header 'Authorization: <auth-key>' \
--header 'Content-Type: application/json' \
--data '{
"filters": [
{
"field": "ip-address",
"operator": "is",
"value": "10.99.55.129"
}
],
"match": "all"
}'
The request to get findings:
curl --request GET \
--url 'https://NexposeServer/api/3/assets/<asset-id>/vulnerabilities?page=0&size=500' \
--header 'Authorization: <auth-key>' \
--header 'Content-Type: application/json'
The Script
By using the requests above, I created two scripts. One for to initiate and one for to get scan results
This is the scripts to initiate a scan:
import sys, os, re, json
import socket
import requests
import requests.packages.urllib3
from requests.auth import HTTPBasicAuth
requests.packages.urllib3.disable_warnings()
nexpose_username = os.environ['NEXPOSE_USERNAME'].replace('"', "")
nexpose_password = os.environ['NEXPOSE_PASSWORD'].replace('"', "")
nexpose_asset_group_id = 332
nexpose_db_scan_type = "nexpose" # not required
def addIpToNexposeSite(ip_address):
print("addIpToNexposeSite")
payload = ip_address
print(ip_address)
headers = {
'Content-Type': "application/json"
}
url = "https://NexposeServer/api/3/sites/" + str(nexpose_asset_group_id) + "/included_targets"
response = requests.put(
url,
headers=headers ,
auth=HTTPBasicAuth(nexpose_username, nexpose_password),
data=json.dumps(payload),
verify=False
)
if response.status_code != 200:
print("ASSET NOT EXIST!!")
return 1
return 0
def getNexposeScanStatus(scan_id):
url = "https://NexposeServer/api/3/scans/" + str(scan_id)
headers = {
'Content-Type': "application/json",
}
response = requests.get(
url,
headers=headers ,
auth=HTTPBasicAuth(nexpose_username, nexpose_password),
verify=False
)
if response.status_code != 200:
print("ERROR - get_scan_status")
return 0
scan_status = response.json()['status']
return scan_status
def startNexposeScan(assets):
ip_addresses = assets
scan_id = 0
print(ip_addresses)
addIpToNexposeSite(ip_addresses)
headers = {
'Content-Type': "application/json"
}
url = "https://NexposeServer/api/3/sites/" + str(nexpose_asset_group_id) + "/scans"
response = requests.post(
url, headers=headers ,
auth=HTTPBasicAuth(nexpose_username, nexpose_password),
verify=False
)
if response.status_code != 201:
print(response.status_code)
print("SCAN DID NOT INITIATED!!")
return 0
scan_id = response.json()['id']
return str(scan_id)
if __name__ == "__main__":
ips = sys.argv[1:]
startNexposeScan(ips)
This is the script to get findings
import sys, os, re, json
import socket
import requests
import requests.packages.urllib3
from requests.auth import HTTPBasicAuth
requests.packages.urllib3.disable_warnings()
nexpose_username = os.environ['NEXPOSE_USERNAME'].replace('"', "")
nexpose_password = os.environ['NEXPOSE_PASSWORD'].replace('"', "")
def getVulnDetails(vuln_id):
try:
url = "https://NexposeServer/api/3/vulnerabilities/" + str(vuln_id)
headers = {
'Content-Type': "application/json",
}
response = requests.get(
url,
headers=headers ,
auth=HTTPBasicAuth(nexpose_username, nexpose_password),
verify=False
)
if response.status_code != 200:
print("ERROR - getVulnDetails")
return 0
return response.json()
except Exception as e:
print ('Exception - getVulnDetails()')
return
def getNexposeAssetID(ip_address):
print(ip_address)
payload = {
"filters": [
{
"field": "ip-address",
"operator": "is",
"value": ip_address
}
],
"match": "all"
}
headers = {
'Content-Type': "application/json",
}
url = "https://NexposeServer/api/3/assets/search"
response = requests.post(
url,
headers=headers,
auth=HTTPBasicAuth(nexpose_username,
nexpose_password),
data=json.dumps(payload),
verify=False
)
if response.status_code != 200:
print(str(response.status_code))
print("ASSET NOT EXIST!! T")
return 0
else:
try:
aid = response.json()['resources'][0]['id']
print(aid)
return aid
except:
print("No asset found")
return("No asset found")
def getNexposeVulns(asset_id):
url = "https://NexposeServer/api/3/assets/" + str(asset_id) + "/vulnerabilities?page=0&size=500"
headers = {
'Content-Type': "application/json",
}
response = requests.get(
url,
headers=headers ,
auth=HTTPBasicAuth(nexpose_username, nexpose_password),
verify=False
)
if response.status_code != 200:
print("ERROR: getNexposeVulns")
return 0
return response.json()
def getFormattedVuln(vuln, ip_address, assetId):
# print(vuln)
vulnDetails = getVulnDetails(vuln['id'])
formattedVuln = {}
try:
formattedVuln['appId'] = ""
formattedVuln['assetId'] = assetId
formattedVuln['title'] = vulnDetails['title']
formattedVuln['srcTool'] = 'nexpose'
formattedVuln['srcToolId'] = vuln['id']
formattedVuln['status'] = vuln['status']
formattedVuln['severity'] = formatVulnImpact(vulnDetails['cvss']['v2']['score'])
formattedVuln['details'] = {}
formattedVuln['details']['info'] = vulnDetails['description']['html']
return formattedVuln
except Exception as e:
print ('Exception: getFormattedVuln')
print (e)
return
def formatVulnImpact(impact):
formattedImpact = 'info'
if impact < 3.4:
formattedImpact = 'low'
elif (impact < 6.7 and impact >= 3.4 ):
formattedImpact = 'medium'
elif (impact < 9 and impact >= 6.7 ):
formattedImpact = 'high'
elif impact >= 9 :
formattedImpact = 'critical'
return formattedImpact
def printVulnerabilitiesOnCLI(vuln):
data = {
"Title" : vuln['title'],
"ID" : vuln['srcToolId']
}
print(data)
if __name__ == "__main__":
ip = sys.argv[1]
asset_id = getNexposeAssetID(ip)
raw_vulns = getNexposeVulns(asset_id)
try:
for vuln in raw_vulns['resources']:
if vuln['status'] == "vulnerable":
prep_vuln = getFormattedVuln(vuln, ip, asset_id)
printVulnerabilitiesOnCLI(prep_vuln)
except Exception as e:
print('Exception in scan()')
print(e)
Here is the command line output of the scripts above:
[user@rhost]$ python3 nexposeResults.py 10.99.55.129
10.99.55.129
806026
{'Title': 'IBM Java: Oracle July 14 2020 CPU (CVE-2020-14556)', 'ID': 'ibm-java-cve-2020-14556'}
{'Title': 'IBM Java: Oracle July 14 2020 CPU (CVE-2020-14577)', 'ID': 'ibm-java-cve-2020-14577'}
{'Title': 'IBM Java: Oracle July 14 2020 CPU (CVE-2020-14578)', 'ID': 'ibm-java-cve-2020-14578'}
{'Title': 'IBM Java: Oracle July 14 2020 CPU (CVE-2020-14579)', 'ID': 'ibm-java-cve-2020-14579'}
{'Title': 'IBM Java: Oracle July 14 2020 CPU (CVE-2020-14583)', 'ID': 'ibm-java-cve-2020-14583'}
{'Title': 'IBM Java: Oracle July 14 2020 CPU (CVE-2020-14593)', 'ID': 'ibm-java-cve-2020-14593'}
{'Title': 'IBM Java: Oracle July 14 2020 CPU (CVE-2020-14621)', 'ID': 'ibm-java-cve-2020-14621'}
{'Title': 'Java CPU October 2017 Java SE, Java SE Embedded, JRockit vulnerability (CVE-2016-10165)', 'ID': 'jre-vuln-cve-2016-10165'}
{'Title': 'Java CPU January 2017 Java SE, Java SE Embedded Libraries vulnerability (CVE-2016-2183)', 'ID': 'jre-vuln-cve-2016-2183'}
{'Title': 'Java CPU January 2017 Java SE, Java SE Embedded, JRockit Libraries vulnerability (CVE-2016-5546)', 'ID': 'jre-vuln-cve-2016-5546'}
{'Title': 'Java CPU January 2017 Java SE, Java SE Embedded, JRockit Libraries vulnerability (CVE-2016-5547)', 'ID': 'jre-vuln-cve-2016-5547'}
{'Title': 'Java CPU January 2017 Java SE, Java SE Embedded Libraries vulnerability (CVE-2016-5548)', 'ID': 'jre-vuln-cve-2016-5548'}
{'Title': 'Java CPU January 2017 Java SE, Java SE Embedded Libraries vulnerability (CVE-2016-5549)', 'ID': 'jre-vuln-cve-2016-5549'}
{'Title': 'Java CPU January 2017 Java SE, Java SE Embedded, JRockit Networking vulnerability (CVE-2016-5552)', 'ID': 'jre-vuln-cve-2016-5552'}
{'Title': 'Java CPU January 2017 Java SE Java Mission Control vulnerability (CVE-2016-8328)', 'ID': 'jre-vuln-cve-2016-8328'}
{'Title': 'Java CPU October 2017 Java SE, Java SE Embedded vulnerability (CVE-2016-9841)', 'ID': 'jre-vuln-cve-2016-9841'}
{'Title': 'Java CPU July 2017 Java SE, Java SE Embedded, JRockit vulnerability (CVE-2017-10053)', 'ID': 'jre-vuln-cve-2017-10053'}
{'Title': 'Java CPU July 2017 Java SE vulnerability (CVE-2017-10067)', 'ID': 'jre-vuln-cve-2017-10067'}
{'Title': 'Java CPU July 2017 Java SE, Java SE Embedded vulnerability (CVE-2017-10074)', 'ID': 'jre-vuln-cve-2017-10074'}
....
....
SNIP
....
....
[user@rhost]$ python3 nexposeScan.py 10.99.55.129 10.6.55.152
['10.99.55.129', '10.6.55.152']
addIpToNexposeSite