Appendix B - Sample script to submit files
This is a sample script in python to submit files and retrieve results from FortiNDR.
#!/usr/bin/python3 # Version 1.0 # par Fortinet # Jan 2021 import os import requests import getopt import argparse import simplejson as json from base64 import b64encode, b64decode import urllib3 import sys import gzip import subprocess import urllib.request import validators from fake_useragent import UserAgent import locale from bs4 import BeautifulSoup import requests host = "IP" AI_api_key = "API_KEY" # Please be careful when regenerate api token. Once new token has been generated, old one will be invalid. class FAIApiClient_file(): def __init__(self, url): self.url = 'https://' + url + '/api/v1/files?access_token=' + AI_api_key self.body = {"file_name": "", "file_content": "", "password": ""} def _handle_post(self, data): """ POST JSON request.. @type data: dict @param data: JSON request data. @rtype: HttpResponse @return: JSON response data. """ response = requests.post(self.url, data=json.dumps(data), verify=False) return response def _load_file_for_upload(self, path_to_file, test_input, filename=''): """ Load file contents into input mapping. @type path_to_file: basestring @param path_to_file: files absolute path. @type test_input: dict @param test_input: JSON request data. @type filename: basestring @param filename: filename override optional param. @rtype: dict @return: updated JSON request dict. """ with open(path_to_file, 'rb') as f: data = f.read() filename = os.path.basename(path_to_file) if not filename else filename test_input['file_name'] = b64encode(filename.encode('utf-8')) test_input['file_content'] = b64encode(data) test_input['password'] = "1" return test_input def send_file(self, OVERRIDE_FILE = '../Resources/samples.zip'): # NOTE: 'OVERRIDE_FILE' should be the absolute path to the file. # When submitting a file via API the noted file ('OVERRIDE_FILE') # will be used as an OVERRIDE. test_input = self.body test_input = self._load_file_for_upload(OVERRIDE_FILE, test_input) response = self._handle_post(test_input) return response def _load_memory_for_upload(self, text_data, test_input, filename=''): """ Load file contents into input mapping. @type path_to_file: basestring @param path_to_file: files absolute path. @type test_input: dict @param test_input: JSON request data. @type filename: basestring @param filename: filename override optional param. @rtype: dict @return: updated JSON request dict. """ tmp_str = "" data = b64encode(text_data) test_input['file_name'] = b64encode(filename.encode('utf-8')) test_input['file_content'] = data test_input['password'] = "1" return test_input def send_url(self, url_page,filename): # NOTE: 'OVERRIDE_FILE' should be the absolute path to the file. # When submitting a file via API the noted file ('OVERRIDE_FILE') # will be used as an OVERRIDE. test_input = self.body test_input = self._load_memory_for_upload(url_page, test_input,filename) response = self._handle_post(test_input) return response def crawl(url,depth): count = 3 # amount of urls in each level url_list_depth = [[] for i in range(0, depth + 1)] url_list_depth[0].append(url) for depth_i in range(0, depth): for links in url_list_depth[depth_i]: valid = True try: response = requests.get(links,verify=False) except (requests.exceptions.InvalidSchema,requests.exceptions.MissingSchema,requests.exceptions.SSLError) as e: valid = False if (valid): soup = BeautifulSoup(response.text, 'html.parser') tags = soup.find_all('a') for link in tags: url_new = link.get('href') flag = False for item in url_list_depth: for l in item: if url_new == l: flag = True if url_new is not None and "http" in url_new and flag is False: url_list_depth[depth_i + 1].append(url_new) #print(links, "->", url_new) else: parse_url (links) return (url_list_depth) def load_file_for_upload(path_to_file): with open(path_to_file, 'rb') as f: data = f.read() return gzip.compress(data) def check_file_id(host, file_id): data = "" results_output = "" tmp_url = "https://" + str(host) + "/api/v1/verdict?access_token=" + str(AI_api_key) + "&fileid=" + str(file_id) command= "curl -k -X GET \""+ tmp_url + "\" -H \"Content-Type: application/json\" " try: results_output = subprocess.check_output(command, shell=True) data = json.loads(results_output) except subprocess.CalledProcessError as e: print(e) sys.exit(0) return (data) def check_submission_results (submit_id,filename): data = "" results_output = "" tmp_url = "https://" + str(host) + "/api/v1/verdict?access_token=" + str(AI_api_key) + "&sid=" + str(submit_id) command= "curl -k -X GET \""+ tmp_url + "\" -H \"Content-Type: application/json\" " try: results_output = subprocess.check_output(command, shell=True) data = json.loads(results_output) if (len(data) > 0): for key in data: if (key == "results"): tmp_data = data[key] for key, value in tmp_data.items(): if (key == "fileids"): if (len(value) > 0): for i in range(0,len(value)): file_id = value[i] new_data = "DATA_IN_PROCESS" stop = True i = 1 while stop: new_data = check_file_id(host, file_id) tmp_check = str(new_data) i = i + 1 if (not ("DATA_IN_PROCESS" in tmp_check)): stop = False elif (i == 50 ): stop = False break results_metadata = "filename:" + str(filename) if (len(new_data) > 0): for key in data: if (key == "results"): try: tmp_data = new_data[key] for key, value in tmp_data.items(): results_metadata = results_metadata + "," + str(key) + ":" + str(value) except KeyError as e: next print (results_metadata) else: print ("filename:" + str(filename) + ",NO RESULTS") except subprocess.CalledProcessError as e: sys.exit(0) def parse_url (tmp_url): client = FAIApiClient_file(host) if (validators.url(tmp_url)): ua = UserAgent() the_page = "" try: request = urllib.request.Request(tmp_url, data=None, headers={'User-Agent': str(ua)}) response = urllib.request.urlopen(request) with urllib.request.urlopen(request) as response: try: the_page = response.read() except Exception as e: pass except (urllib.error.URLError,urllib.error.ContentTooShortError,urllib.error.HTTPError) as e: print ("CANNOT GET URL:" + str(tmp_url)) sys.exit(0) if (len(the_page) > 1): filename = tmp_url.replace(",","_") tmp_data = json.loads(client.send_url(the_page,"url").text) if ("submit_id" in tmp_data): submit_id = tmp_data['submit_id'] if (submit_id > 0) : filename = tmp_url.replace(",","_") check_submission_results (submit_id,filename) else: print ("url:" + str(tmp_url) , "NO RESULTS") else: print ("url:" + str(tmp_url) , "NO RESULTS") else: the_page = str.encode(tmp_url) if (len(the_page) > 1): filename = tmp_url.replace(",","_") tmp_data = json.loads(client.send_url(the_page,"url").text) if ("submit_id" in tmp_data): submit_id = tmp_data['submit_id'] if (submit_id > 0) : filename = tmp_url.replace(",","_") check_submission_results (submit_id,"url") else: print ("url:" + str(tmp_url) , "NO RESULTS") else: print ("url:" + str(tmp_url) , "NO RESULTS") def getpreferredencoding(do_setlocale = True): return "utf-8" def main(argv): locale.getpreferredencoding = getpreferredencoding urllib3.disable_warnings() parser = argparse.ArgumentParser(description='Test upload files to FortiAi and fortisandbox tool') parser.add_argument("-f","--file", type=str, help="Filename to submit") parser.add_argument("-u","--url", type=str, help="Filename to submit") parser.add_argument("-d","--depth", type=int, help="Depth for url analysis, default 0 (just the url page), if depth not defined, maxdepth 3") args = parser.parse_args() if ( not (args.file or args.url)): parser.print_help() sys.exit(0) if (args.depth): depth = args.depth else: depth = 0 if (depth > 3): depth = 3 if (args.file): client = FAIApiClient_file(host) tmp_data = json.loads(client.send_file(args.file).text) if ("submit_id" in tmp_data): submit_id = tmp_data['submit_id'] if (submit_id > 0) : check_submission_results (submit_id,args.file) else: print ("filename:" + str(args.file) , "NO RESULTS") if (args.url): if (depth == 0): parse_url (args.url) else: list_of_url_to_parse = "" list_url = crawl (args.url,depth) for i in list_url: tmp_list = i for j in tmp_list: parse_url(j) # Example command: python FAI_Client.py <fai_ip> <api key> <sample file path> if __name__ == '__main__': main(sys.argv)