Сканер уязвимостей на Python или как написать сканер за 6 часов

в 8:45, , рубрики: CVE, NIST, nmap, nmap-сканирование, python, ИБ, информационная безопасность, сканер, сканер уязвимостей, ФСТЭК

Недавно мне довелось участвовать в хакатоне по информационной безопасности на научной конференции в прекрасном городе Санкт-Петербург в СПбГУТ. Одно из заданий представляло из себя написание собственного сканера уязвимостей на любом ЯП с условиями, что использование проприетарного ПО и фреймворков запрещено. Можно было пользоваться кодом и фреймворками существующих сканеров уязвимости с открытым кодом. Это задание и мое решение с моим коллегой мы и разберем в этой публикации.

Подготовительный этап

Так как нам дали на это задание буквально два дня, а эти дни были загружены до отвала, то решение писать данный сканер было принято на последнюю ночь перед сдачей отчета с исходным кодом (в принципе, как это всегда и делается). Решено было использовать для скана портов утилиту с открытым исходным кодом nmap, которая входит в kali. С помощью флага -sV мы узнали какие сервисы и их версии запущены на каждом порте хоста.

    nm = nmap.PortScanner()
    # Настроить параметры сканирования nmap
    scan_raw_result = nm.scan(hosts=network_prefix, arguments='-v -n -A')

Далее пришлось написать небольшой скрипт, чтобы он парсил эти названия сервисов с версиями. На выходе получаем списки на каждый порт у каждого хоста.

for host, result in scan_raw_result['scan'].items():
        if result['status']['state'] == 'up':
            print('#' * 17 + 'Host:' + host + '#' * 17)
            idno = 1
            for port in result['tcp']:    
                print('-' * 17 + "Детали TCP-сервера" + '[' + str(idno) + ']' + '-' * 17)
                idno += 1
                print('Номер порта TCP:' + str(port))
                
                print('положение дел:' + result['tcp'][port]['state'])
                
                print('причина:' + result['tcp'][port]['reason'])
                
                print('Дополнительная информация:' + result['tcp'][port]['extrainfo'])
                
                print('Имя:' + result['tcp'][port]['name'])
                
                cur_ver = result['tcp'][port]['version']
                print('версия:' + result['tcp'][port]['version'])
                
                print('сервис:' + result['tcp'][port]['product'])
                cur_soft_title = result['tcp'][port]['product']
                if ' ' in cur_soft_title:
                    cur_soft_title = cur_soft_title.split()[0].lower()
                if ('windows' in cur_soft_title) or ('linux' in cur_soft_title) or ('microsoft' in cur_soft_title):
                    cur_soft_title = None
                print('3 '+cur_soft_title)
                
                print('CPE:' + result['tcp'][port]['cpe'])
                
                print("Сценарий:" + result['tcp'][port]['script'])

                if cur_ver != '' and cur_soft_title != '':
                    os.system('python nist_scanner.py -s {} {}'.format(str(cur_soft_title), str(cur_ver)))
                if cur_ver and cur_soft_title:
                    BDU_check(cur_soft_title, cur_ver)

            idno = 1
            for port in result['udp']:
                print('-' * 17 + "Детали сервера UDP" + '[' + str(idno) + ']' + '-' * 17)
                idno += 1
                print('Номер порта UDP:' + str(port))
                print('state:' + result['udp'][port]['state'])
                print('reason:' + result['udp'][port]['reason'])
                print('Дополнительная информация:' + result['udp'][port]['extrainfo'])
                print('Имя:' + result['udp'][port]['name'])
                print('версия:' + result['udp'][port]['version'])
                cur_ver =result['udp'][port]['version']
                cur_soft_title = result['udp'][port]['product']
                print('сервис:' + cur_soft_title)
                if ' ' in cur_soft_title:
                    cur_soft_title = cur_soft_title.split()[0].lower()
                if 'windows' in cur_soft_title or 'linux' in cur_soft_title :
                    cur_soft_title = None
                print('CPE:' + result['udp'][port]['cpe'])
                print("script:" + result['udp'][port]['script'])
                if cur_ver != '' and cur_soft_title != '':
                    os.system('python nist_scanner.py -s {} {}'.format(str(cur_soft_title), str(cur_ver)))
                if cur_ver and cur_soft_title:
                    BDU_check(cur_soft_title, cur_ver)

Шаблон: https://russianblogs.com/article/7503575156/

А как быть дальше?

Многие на этом моменте и остались, лишь научившись вызывать nmap из питона и выдавать красивый вывод строками. Конечно, кто-то пользовался скриптом для nmap, таким как Vulscan, но тоже не увенчалось успехом, так как (как показалось мне) данный скрипт выводит любое упоминание данного сервиса в описании к CVE, независимо от версии данного сервиса.

Подумав, мы решили, что развернуть зеркала баз уязвимостей для дальнейшего парсинга - это хорошая идея (как в конце и оказалось). Скачали json NIST (https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-{YEAR}.json.gz, где вместо {YEAR} подставляется год из диапазона 2002-наш год) и БДУ ФСТЭК (https://bdu.fstec.ru/files/documents/vullist.xlsx). За основу был взят парсер CVE, где выводится все CVE, в которых упоминается строчка, которую ты укажешь (https://github.com/stratosphereips/nist-cve-search-tool). Переписав полностью под себя этот код, я столкнулся с проблемой, что мне нужно точно определять подходит ли мне данная CVE или нет, в том числе и по версии самого сервиса. Я порылся в зеркале NIST'а в json-файлах и обнаружил, что у каждой cve есть ключ "cpe23Uri", где всегда точно можно явно найти название сервиса, про которое идет речь в данной CVE, а также "versionEndExcluding" и "versionStartExcluding", где написаны с какой версии данная уязвимость и по какую соответственно.

Скрин одного из cve по osquery
Скрин одного из cve по osquery

Это и решено было парсить.

def search(j, s, v): # j-json, s-name of service, v-verison of service
    i = 0
    regex = re.compile(f'({s})', re.I)
    for entry in j['CVE_Items']:
        if 'cve' in entry:
            desc = entry['configurations']['nodes']
            for d in desc:
                for cpe in d['cpe_match']:
                    if regex.search(cpe['cpe23Uri']) != None:
                        if 'versionEndExcluding' in cpe and version.parse(v) < version.parse(cpe['versionEndExcluding']):
                            if 'versionStartExcluding' in cpe and version.parse(v) > version.parse(cpe['versionStartExcluding']):
                                CVEs.append(entry)
                                i += 1
                                break
                            else:
                                CVEs.append(entry)
                                i += 1
                                break
        if i == count:
            break

Пока я занимался парсингом json, мой коллега занимался парсингом csv-файла уязвимостей от ФСТЭК, чтобы на выходе были CVE как от БДУ ФСТЭК, так и от NIST.

Зайдя на сайт БДУ ФСТЭК находим возможность скачать данные в виде xlsx файла. Для удобства дальнейшей работы и парсинга переформатируем файл в формат csv. И используя уже имеющиеся библиотеки для работы с csv, разбираем БДУ и настраиваем логику, чтобы выводились верные потенциальные уязвимости.

def BDU_check(cur_soft_title, cur_ver):
    with open('vullist_1.csv', encoding='utf-8') as csvfile:
        # print(123)
        reader = csv.DictReader(csvfile)
        i = 0
        for row in reader:
            soft_title = str(row['Название ПО'])
            versions = row['Версия ПО']
            if cur_soft_title.lower() in soft_title.lower():
                cve_row = row['Идентификаторы других систем описаний уязвимости']
                for current_service_version in versions.split(','):

                    # нижняя граница версии
                    if 'от' in current_service_version:
                        begin_version = re.search('[^d.]?[d.]+[^d.]?', str(current_service_version)+' ')[0]
                        while re.search('[d]', begin_version[0]) is None:
                            begin_version = begin_version[1:]
                        while re.search("[d]", begin_version[-1]) is None:
                            begin_version = begin_version[:-1]

                    if 'до' in current_service_version:
                        end_version = re.search('[^d.]?[d.]+[^d.]?', str(current_service_version)+ ' ')
                        end_version = end_version[0]

                        while re.search('[^d]', end_version[0]):
                            end_version = end_version[1:]
                        while re.search('[^d]', end_version[-1]):
                            end_version = end_version[:-1]

                        cur_ver = re.search('[^d.]?[d.]+[^d.]?', str(cur_ver)+ ' ')
                        cur_ver = cur_ver[0]

                        while re.search('[^d]', cur_ver[0]):
                             end_version = end_version[1:]
                        while re.search('[^d]', cur_ver[-1]):
                             cur_ver = cur_ver[:-1]
                    flag_begin_vesion = (begin_version and (not end_version) and (version.parse(begin_version) <= version.parse(cur_ver)))
                    flag_end_vesion = ((not begin_version) and (end_version) and (version.parse(cur_ver) <= version.parse(end_version)))
                    flag_both_vesion = (begin_version and (end_version) and (version.parse(begin_version) <= version.parse(cur_ver)) and (version.parse(cur_ver) <= version.parse(end_version)))
                    
                    if flag_begin_vesion or flag_end_vesion or flag_both_vesion:
                        print('Идентификатор         : ' + str(row['Идентификатор']))
                        print('CVE                   : ' + str(cve_row))
                        print('Название ПО             : '+ str(row['Название ПО']))
                        print('Версия ПО             : '+ str(row['Версия ПО']))
                        print('Версия ПО общ. признак: '+ str(current_service_version))
                        print('Описание уязвимости   : ' + str(row['Описание уязвимости']))
                        print('----------------------------n')
                        break
            i += 1

Логика проверки версии ПО уязвимости для найденного сервиса была не идеальна, но благодаря этому отбросили порядка 80-90% неподходящих и неактуальных уязвимостей

В конце, когда мы соединили наши модули (не без трудностей, конечно), у нас и вышел сканер, с помощью которого мы смогли победить в хакатоне.

Ссылки:

Git-репозиторий с исходниками: https://github.com/mksmp/vulnerability_scanner

Соавтор:

https://github.com/aleksey2101

@aleksey2_1

Автор:
mksmpn

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js