Fortinet black logo

Administration Guide

Appendix B - Sample script to submit files

Appendix B - Sample script to submit files

This is a sample script in python to submit files and retrieve results from FortiNDR.

#!/usr/bin/python3

# Version 1.0
# par Fortinet
# Jan 2021

import os
import requests
import getopt
import argparse
import simplejson as json
from base64 import b64encode, b64decode
import urllib3
import sys
import gzip
import subprocess
import urllib.request
import validators
from fake_useragent import UserAgent
import locale
from bs4 import BeautifulSoup
import requests


host = "IP"
AI_api_key = "API_KEY"

# Please be careful when regenerate api token. Once new token has been generated, old one will be invalid.


class FAIApiClient_file():

	def __init__(self, url):
		self.url = 'https://' + url + '/api/v1/files?access_token=' + AI_api_key
		self.body = {"file_name": "",
			     "file_content": "",
			     "password": ""}

	def _handle_post(self, data):
		"""
		POST JSON request..

		@type data: dict
		@param data: JSON request data.
		@rtype: HttpResponse
		@return: JSON response data.
		"""
		response = requests.post(self.url, data=json.dumps(data), verify=False)

		return response

	def _load_file_for_upload(self, path_to_file, test_input, filename=''):
		"""
		Load file contents into input mapping.

		@type path_to_file: basestring
		@param path_to_file: files absolute path.
		@type test_input: dict
		@param test_input: JSON request data.
		@type filename: basestring
		@param filename: filename override optional param.
		@rtype: dict
		@return: updated JSON request dict.
		"""
		with open(path_to_file, 'rb') as f:
			data = f.read()
		filename = os.path.basename(path_to_file) if not filename else filename
		test_input['file_name'] = b64encode(filename.encode('utf-8'))
		test_input['file_content'] = b64encode(data)
		test_input['password'] = "1"
		return test_input

	def send_file(self, OVERRIDE_FILE = '../Resources/samples.zip'):
		# NOTE: 'OVERRIDE_FILE' should be the absolute path to the file.
		#       When submitting a file via API the noted file ('OVERRIDE_FILE')
		#       will be used as an OVERRIDE.
		test_input = self.body
		test_input = self._load_file_for_upload(OVERRIDE_FILE, test_input)
		response = self._handle_post(test_input)
		return response

	def _load_memory_for_upload(self, text_data, test_input, filename=''):
		"""
		Load file contents into input mapping.

		@type path_to_file: basestring
		@param path_to_file: files absolute path.
		@type test_input: dict
		@param test_input: JSON request data.
		@type filename: basestring
		@param filename: filename override optional param.
		@rtype: dict
		@return: updated JSON request dict.
		"""
		
		tmp_str = ""  

		data = b64encode(text_data)

		test_input['file_name'] = b64encode(filename.encode('utf-8'))
		test_input['file_content'] = data
		test_input['password'] = "1"
		return test_input

	def send_url(self, url_page,filename):
		# NOTE: 'OVERRIDE_FILE' should be the absolute path to the file.
		#       When submitting a file via API the noted file ('OVERRIDE_FILE')
		#       will be used as an OVERRIDE.
		test_input = self.body
		test_input = self._load_memory_for_upload(url_page, test_input,filename)
		response = self._handle_post(test_input)
		return response
		
def crawl(url,depth):

	count = 3  # amount of urls in each level
	url_list_depth = [[] for i in range(0, depth + 1)]
	url_list_depth[0].append(url)
	for depth_i in range(0, depth):
		for links in url_list_depth[depth_i]:
			valid = True
			try:
				response = requests.get(links,verify=False)
				
			except (requests.exceptions.InvalidSchema,requests.exceptions.MissingSchema,requests.exceptions.SSLError) as e:
				valid = False
				
			if (valid):
				soup = BeautifulSoup(response.text, 'html.parser')
				tags = soup.find_all('a')
				for link in tags:
					url_new = link.get('href')
					flag = False
					for item in url_list_depth:
						for l in item:
							if url_new == l:
								flag = True

					if url_new is not None and "http" in url_new and flag is False:
						url_list_depth[depth_i + 1].append(url_new)
						#print(links, "->", url_new)

			else:
				parse_url (links)


	return (url_list_depth)
	
def load_file_for_upload(path_to_file):
	
	with open(path_to_file, 'rb') as f:
		data = f.read()

	return gzip.compress(data)

def check_file_id(host, file_id):
	data = ""
	results_output	= ""

	tmp_url = "https://" + str(host) + "/api/v1/verdict?access_token=" + str(AI_api_key) + "&fileid=" + str(file_id)
	command= "curl -k -X GET \""+ tmp_url  + "\"  -H \"Content-Type: application/json\" "
	
	try:
		results_output = subprocess.check_output(command, shell=True)
		data =  json.loads(results_output)
		
	except subprocess.CalledProcessError as e:

		print(e)
		sys.exit(0)

	return (data)

def check_submission_results (submit_id,filename):
	data = ""
	results_output	= ""

	tmp_url = "https://" + str(host) + "/api/v1/verdict?access_token=" + str(AI_api_key) + "&sid=" + str(submit_id)
	command= "curl -k -X GET \""+ tmp_url  + "\"  -H \"Content-Type: application/json\" "
	
	try:
		results_output = subprocess.check_output(command, shell=True)
		data =  json.loads(results_output)

		if (len(data) > 0):
			for key in data:
				if (key == "results"):
				   tmp_data = data[key]
				   for key, value in tmp_data.items():
					if (key == "fileids"):
						if (len(value) > 0):
						   for i in range(0,len(value)):
							file_id = value[i]
							new_data  = "DATA_IN_PROCESS"
							stop = True
							i = 1
							while stop:
							   new_data = check_file_id(host, file_id)
							   tmp_check = str(new_data)
							   i = i + 1
										
							   if (not ("DATA_IN_PROCESS" in tmp_check)):
							      stop = False
							   elif (i == 50 ):
							      stop = False
							      break

									
							results_metadata = "filename:" + str(filename)
							if (len(new_data) > 0):
							   for key in data:
							      if (key == "results"):
								try:
								   tmp_data = new_data[key]
								   for key, value in tmp_data.items():
							  	      results_metadata = results_metadata + "," 
+ str(key) + ":" + str(value)
								except KeyError as e:
								   next
													
							print (results_metadata)

							else:
								print ("filename:" + str(filename)  + ",NO RESULTS")		
	except subprocess.CalledProcessError as e:

		sys.exit(0)

def parse_url (tmp_url):

	client = FAIApiClient_file(host)

	if (validators.url(tmp_url)):
		ua = UserAgent()
		the_page = ""
		
		try:
			request = urllib.request.Request(tmp_url, data=None, headers={'User-Agent':  str(ua)})
			response = urllib.request.urlopen(request)

			with urllib.request.urlopen(request) as response:
				try:
					the_page = response.read()

				except Exception  as e:
					pass
					
		except (urllib.error.URLError,urllib.error.ContentTooShortError,urllib.error.HTTPError) as e:
				print ("CANNOT GET  URL:"  + str(tmp_url))		
				sys.exit(0)

		if (len(the_page) > 1):
			filename = tmp_url.replace(",","_")
			tmp_data = json.loads(client.send_url(the_page,"url").text)
			if ("submit_id" in tmp_data):
				submit_id = tmp_data['submit_id']
				if (submit_id > 0) :
					filename = tmp_url.replace(",","_")
					check_submission_results (submit_id,filename)
				else:
					print ("url:" + str(tmp_url) , "NO RESULTS")		
		else:
			print ("url:" + str(tmp_url) , "NO RESULTS")		

	else:
		the_page = str.encode(tmp_url)
		if (len(the_page) > 1):
			filename = tmp_url.replace(",","_")
			tmp_data = json.loads(client.send_url(the_page,"url").text)
			if ("submit_id" in tmp_data):
				submit_id = tmp_data['submit_id']
				if (submit_id > 0) :
					filename = tmp_url.replace(",","_")
					check_submission_results (submit_id,"url")
				else:
					print ("url:" + str(tmp_url) , "NO RESULTS")		
		else:
			print ("url:" + str(tmp_url) , "NO RESULTS")		


def getpreferredencoding(do_setlocale = True):
	return "utf-8"

def main(argv):
	locale.getpreferredencoding = getpreferredencoding

	urllib3.disable_warnings()

	parser = argparse.ArgumentParser(description='Test upload files to FortiAi and fortisandbox tool')

	parser.add_argument("-f","--file",   type=str, help="Filename to submit")
	parser.add_argument("-u","--url",   type=str, help="Filename to submit")
	parser.add_argument("-d","--depth",  type=int, help="Depth for url analysis, default 0 (just the url page), if depth not defined, maxdepth 3")

	args = parser.parse_args()

	if ( not (args.file or args.url)):
		parser.print_help()
		sys.exit(0)

	if (args.depth):
		depth = args.depth
	else:
		depth = 0
		
	if (depth > 3):
		depth = 3
	
	if (args.file):
		client = FAIApiClient_file(host)
		tmp_data = json.loads(client.send_file(args.file).text)
		if ("submit_id" in tmp_data):
			submit_id = tmp_data['submit_id']
			if (submit_id > 0) :
				check_submission_results (submit_id,args.file)
			else:
				print ("filename:" + str(args.file) , "NO RESULTS")		

	if (args.url):

		if (depth == 0):

			parse_url (args.url)
		else:
		
			list_of_url_to_parse = ""
			list_url = crawl (args.url,depth)

			for i in list_url:
				tmp_list = i
				for j in tmp_list:
					parse_url(j)


# Example command: python FAI_Client.py <fai_ip> <api key> <sample file path>
if __name__ == '__main__':
    main(sys.argv)

Appendix B - Sample script to submit files

This is a sample script in python to submit files and retrieve results from FortiNDR.

#!/usr/bin/python3

# Version 1.0
# par Fortinet
# Jan 2021

import os
import requests
import getopt
import argparse
import simplejson as json
from base64 import b64encode, b64decode
import urllib3
import sys
import gzip
import subprocess
import urllib.request
import validators
from fake_useragent import UserAgent
import locale
from bs4 import BeautifulSoup
import requests


host = "IP"
AI_api_key = "API_KEY"

# Please be careful when regenerate api token. Once new token has been generated, old one will be invalid.


class FAIApiClient_file():

	def __init__(self, url):
		self.url = 'https://' + url + '/api/v1/files?access_token=' + AI_api_key
		self.body = {"file_name": "",
			     "file_content": "",
			     "password": ""}

	def _handle_post(self, data):
		"""
		POST JSON request..

		@type data: dict
		@param data: JSON request data.
		@rtype: HttpResponse
		@return: JSON response data.
		"""
		response = requests.post(self.url, data=json.dumps(data), verify=False)

		return response

	def _load_file_for_upload(self, path_to_file, test_input, filename=''):
		"""
		Load file contents into input mapping.

		@type path_to_file: basestring
		@param path_to_file: files absolute path.
		@type test_input: dict
		@param test_input: JSON request data.
		@type filename: basestring
		@param filename: filename override optional param.
		@rtype: dict
		@return: updated JSON request dict.
		"""
		with open(path_to_file, 'rb') as f:
			data = f.read()
		filename = os.path.basename(path_to_file) if not filename else filename
		test_input['file_name'] = b64encode(filename.encode('utf-8'))
		test_input['file_content'] = b64encode(data)
		test_input['password'] = "1"
		return test_input

	def send_file(self, OVERRIDE_FILE = '../Resources/samples.zip'):
		# NOTE: 'OVERRIDE_FILE' should be the absolute path to the file.
		#       When submitting a file via API the noted file ('OVERRIDE_FILE')
		#       will be used as an OVERRIDE.
		test_input = self.body
		test_input = self._load_file_for_upload(OVERRIDE_FILE, test_input)
		response = self._handle_post(test_input)
		return response

	def _load_memory_for_upload(self, text_data, test_input, filename=''):
		"""
		Load file contents into input mapping.

		@type path_to_file: basestring
		@param path_to_file: files absolute path.
		@type test_input: dict
		@param test_input: JSON request data.
		@type filename: basestring
		@param filename: filename override optional param.
		@rtype: dict
		@return: updated JSON request dict.
		"""
		
		tmp_str = ""  

		data = b64encode(text_data)

		test_input['file_name'] = b64encode(filename.encode('utf-8'))
		test_input['file_content'] = data
		test_input['password'] = "1"
		return test_input

	def send_url(self, url_page,filename):
		# NOTE: 'OVERRIDE_FILE' should be the absolute path to the file.
		#       When submitting a file via API the noted file ('OVERRIDE_FILE')
		#       will be used as an OVERRIDE.
		test_input = self.body
		test_input = self._load_memory_for_upload(url_page, test_input,filename)
		response = self._handle_post(test_input)
		return response
		
def crawl(url,depth):

	count = 3  # amount of urls in each level
	url_list_depth = [[] for i in range(0, depth + 1)]
	url_list_depth[0].append(url)
	for depth_i in range(0, depth):
		for links in url_list_depth[depth_i]:
			valid = True
			try:
				response = requests.get(links,verify=False)
				
			except (requests.exceptions.InvalidSchema,requests.exceptions.MissingSchema,requests.exceptions.SSLError) as e:
				valid = False
				
			if (valid):
				soup = BeautifulSoup(response.text, 'html.parser')
				tags = soup.find_all('a')
				for link in tags:
					url_new = link.get('href')
					flag = False
					for item in url_list_depth:
						for l in item:
							if url_new == l:
								flag = True

					if url_new is not None and "http" in url_new and flag is False:
						url_list_depth[depth_i + 1].append(url_new)
						#print(links, "->", url_new)

			else:
				parse_url (links)


	return (url_list_depth)
	
def load_file_for_upload(path_to_file):
	
	with open(path_to_file, 'rb') as f:
		data = f.read()

	return gzip.compress(data)

def check_file_id(host, file_id):
	data = ""
	results_output	= ""

	tmp_url = "https://" + str(host) + "/api/v1/verdict?access_token=" + str(AI_api_key) + "&fileid=" + str(file_id)
	command= "curl -k -X GET \""+ tmp_url  + "\"  -H \"Content-Type: application/json\" "
	
	try:
		results_output = subprocess.check_output(command, shell=True)
		data =  json.loads(results_output)
		
	except subprocess.CalledProcessError as e:

		print(e)
		sys.exit(0)

	return (data)

def check_submission_results (submit_id,filename):
	data = ""
	results_output	= ""

	tmp_url = "https://" + str(host) + "/api/v1/verdict?access_token=" + str(AI_api_key) + "&sid=" + str(submit_id)
	command= "curl -k -X GET \""+ tmp_url  + "\"  -H \"Content-Type: application/json\" "
	
	try:
		results_output = subprocess.check_output(command, shell=True)
		data =  json.loads(results_output)

		if (len(data) > 0):
			for key in data:
				if (key == "results"):
				   tmp_data = data[key]
				   for key, value in tmp_data.items():
					if (key == "fileids"):
						if (len(value) > 0):
						   for i in range(0,len(value)):
							file_id = value[i]
							new_data  = "DATA_IN_PROCESS"
							stop = True
							i = 1
							while stop:
							   new_data = check_file_id(host, file_id)
							   tmp_check = str(new_data)
							   i = i + 1
										
							   if (not ("DATA_IN_PROCESS" in tmp_check)):
							      stop = False
							   elif (i == 50 ):
							      stop = False
							      break

									
							results_metadata = "filename:" + str(filename)
							if (len(new_data) > 0):
							   for key in data:
							      if (key == "results"):
								try:
								   tmp_data = new_data[key]
								   for key, value in tmp_data.items():
							  	      results_metadata = results_metadata + "," 
+ str(key) + ":" + str(value)
								except KeyError as e:
								   next
													
							print (results_metadata)

							else:
								print ("filename:" + str(filename)  + ",NO RESULTS")		
	except subprocess.CalledProcessError as e:

		sys.exit(0)

def parse_url (tmp_url):

	client = FAIApiClient_file(host)

	if (validators.url(tmp_url)):
		ua = UserAgent()
		the_page = ""
		
		try:
			request = urllib.request.Request(tmp_url, data=None, headers={'User-Agent':  str(ua)})
			response = urllib.request.urlopen(request)

			with urllib.request.urlopen(request) as response:
				try:
					the_page = response.read()

				except Exception  as e:
					pass
					
		except (urllib.error.URLError,urllib.error.ContentTooShortError,urllib.error.HTTPError) as e:
				print ("CANNOT GET  URL:"  + str(tmp_url))		
				sys.exit(0)

		if (len(the_page) > 1):
			filename = tmp_url.replace(",","_")
			tmp_data = json.loads(client.send_url(the_page,"url").text)
			if ("submit_id" in tmp_data):
				submit_id = tmp_data['submit_id']
				if (submit_id > 0) :
					filename = tmp_url.replace(",","_")
					check_submission_results (submit_id,filename)
				else:
					print ("url:" + str(tmp_url) , "NO RESULTS")		
		else:
			print ("url:" + str(tmp_url) , "NO RESULTS")		

	else:
		the_page = str.encode(tmp_url)
		if (len(the_page) > 1):
			filename = tmp_url.replace(",","_")
			tmp_data = json.loads(client.send_url(the_page,"url").text)
			if ("submit_id" in tmp_data):
				submit_id = tmp_data['submit_id']
				if (submit_id > 0) :
					filename = tmp_url.replace(",","_")
					check_submission_results (submit_id,"url")
				else:
					print ("url:" + str(tmp_url) , "NO RESULTS")		
		else:
			print ("url:" + str(tmp_url) , "NO RESULTS")		


def getpreferredencoding(do_setlocale = True):
	return "utf-8"

def main(argv):
	locale.getpreferredencoding = getpreferredencoding

	urllib3.disable_warnings()

	parser = argparse.ArgumentParser(description='Test upload files to FortiAi and fortisandbox tool')

	parser.add_argument("-f","--file",   type=str, help="Filename to submit")
	parser.add_argument("-u","--url",   type=str, help="Filename to submit")
	parser.add_argument("-d","--depth",  type=int, help="Depth for url analysis, default 0 (just the url page), if depth not defined, maxdepth 3")

	args = parser.parse_args()

	if ( not (args.file or args.url)):
		parser.print_help()
		sys.exit(0)

	if (args.depth):
		depth = args.depth
	else:
		depth = 0
		
	if (depth > 3):
		depth = 3
	
	if (args.file):
		client = FAIApiClient_file(host)
		tmp_data = json.loads(client.send_file(args.file).text)
		if ("submit_id" in tmp_data):
			submit_id = tmp_data['submit_id']
			if (submit_id > 0) :
				check_submission_results (submit_id,args.file)
			else:
				print ("filename:" + str(args.file) , "NO RESULTS")		

	if (args.url):

		if (depth == 0):

			parse_url (args.url)
		else:
		
			list_of_url_to_parse = ""
			list_url = crawl (args.url,depth)

			for i in list_url:
				tmp_list = i
				for j in tmp_list:
					parse_url(j)


# Example command: python FAI_Client.py <fai_ip> <api key> <sample file path>
if __name__ == '__main__':
    main(sys.argv)