Created
February 23, 2020 19:50
-
-
Save srinivas946/36befcf909093d3a91f2acea72300312 to your computer and use it in GitHub Desktop.
Read a list of Hashes available in csv file and convert them to another Hash format using Virus Total Threat Intel Feed API, and store the results by creating new csv file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "cells": [ | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "<h2>Read a list of Hashes available in csv file and convert them to another Hash formats using Virus Total Threat Intel Feed API, and store the results by creating new csv file.</h2>" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "import requests # load requests module for API Interactions\n", | |
| "import csv # load csv module for Handling CSV Files\n", | |
| "import time # use for sleep the script for certain time" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "<b>Read CSV File</b>" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "def read_csv():\n", | |
| " hashes_list = []\n", | |
| " with open('hashes.csv', 'r') as csvfile: # open csv file in read mode\n", | |
| " data_reader = csv.reader(csvfile) # use reader method to read file information\n", | |
| " next(data_reader) # skip the header\n", | |
| " for row in data_reader: # loop the rows and store them in a list\n", | |
| " hashes_list.append(row[0])\n", | |
| " return hashes_list # return the list when this method gets called" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "<b>Connect to Virus Total</b>" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "def fetch_vt_info():\n", | |
| " hashes_list = read_csv() # get list of hashes from csv file\n", | |
| " store_hash_information = []\n", | |
| " for i in range(1, len(hashes_list)+1): # loop the csv file\n", | |
| " if i%4 != 0:\n", | |
| " params = {'apikey': '6e73a5961108621d609689836444701bf652e0c2ea2512b81e164a892bfdd187','resource': hashes_list[i]}\n", | |
| " response = requests.get('https://www.virustotal.com/vtapi/v2/file/report', params=params) # api request\n", | |
| " if response.status_code == 200:\n", | |
| " obtained_res = response.json() # get response in json format\n", | |
| " # parse the required information\n", | |
| " sha1 = obtained_res['sha1'] \n", | |
| " sha256 = obtained_res['sha256']\n", | |
| " md5 = obtained_res['md5']\n", | |
| " score = f\"{obtained_res['positives']}/{obtained_res['total']}\"\n", | |
| " # store them in the form of nested list where each inner list is represented as a row\n", | |
| " store_hash_information.append([sha1, sha256, md5, score])\n", | |
| " else:\n", | |
| " print('Script Waiting for 1 Minute')\n", | |
| " time.sleep(60)\n", | |
| " return store_hash_information\n", | |
| "\n", | |
| "fetch_vt_info()\n", | |
| " " | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "<li>REST API is a Vast Concept. Learn it from Internet Resources</li>\n", | |
| "<li>To Test Results of REST API, use <a href=\"https://www.postman.com/\">Postman</a> Tool</li>" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "<b>Write data to csv file</b>" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "def write_csv(data):\n", | |
| " with open('hash_output.csv', 'w') as csvfile:\n", | |
| " data_writer = csv.writer(csvfile)\n", | |
| " data_writer.writerow(['SHA1', 'SHA256', 'MD5', 'SCORE'])\n", | |
| " data_writer.writerows(data)\n", | |
| " return True\n", | |
| "data = fetch_vt_info()\n", | |
| "write_csv(data)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "<b>Real Time Script</b>" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "import csv, requests, time\n", | |
| "\n", | |
| "# ========================================\n", | |
| "# CHECK HASH VALUES USING VIRUSTOTAL\n", | |
| "# ========================================\n", | |
| "class Hash_Check:\n", | |
| " \n", | |
| " # ====================================================\n", | |
| " # LOAD REQUIRED FILES AND PARAMETERS FOR API REQUEST\n", | |
| " # ====================================================\n", | |
| " def __init__(self, read_file_path, write_file_path, api_key, api_url):\n", | |
| " self._read_file_path = read_file_path\n", | |
| " self._write_file_path = write_file_path\n", | |
| " self._api_key = api_key\n", | |
| " self._api_url = api_url\n", | |
| " \n", | |
| " # =============================================================\n", | |
| " # READ CSV FILE USING CSV MODULE AND RETURN HASH INFORMATION\n", | |
| " # =============================================================\n", | |
| " def read_csv(self):\n", | |
| " hashes_list = []\n", | |
| " with open(self._read_file_path, 'r') as csvfile:\n", | |
| " data_reader = csv.reader(csvfile) \n", | |
| " next(data_reader) \n", | |
| " for row in data_reader: \n", | |
| " hashes_list.append(row[0])\n", | |
| " return hashes_list\n", | |
| " \n", | |
| " # ===================================================\n", | |
| " # CHECK HASH INFORMATION FROM VIRUSTOTAL USING API\n", | |
| " # ===================================================\n", | |
| " def check_vt(self, hashes_list):\n", | |
| " store_hash_information = []\n", | |
| " for i in range(1, len(hashes_list)+1):\n", | |
| " if i%4 != 0:\n", | |
| " params = {'apikey': self._api_key,'resource': hashes_list[i]}\n", | |
| " response = requests.get(self._api_url, params=params)\n", | |
| " if response.status_code == 200:\n", | |
| " obtained_res = response.json()\n", | |
| " sha1 = obtained_res['sha1'] \n", | |
| " sha256 = obtained_res['sha256']\n", | |
| " md5 = obtained_res['md5']\n", | |
| " score = f\"{obtained_res['positives']}/{obtained_res['total']}\"\n", | |
| " store_hash_information.append([sha1, sha256, md5, score])\n", | |
| " break\n", | |
| " else:\n", | |
| " print('Script Waiting for 1 Minute')\n", | |
| " time.sleep(60)\n", | |
| " return store_hash_information\n", | |
| "\n", | |
| " # ====================================================\n", | |
| " # WRITE HAHES RESULT TO CSV FILE USING CSV MODULE\n", | |
| " # ====================================================\n", | |
| " def write_csv(self, data):\n", | |
| " with open(self._write_file_path, 'w') as csvfile:\n", | |
| " data_writer = csv.writer(csvfile)\n", | |
| " data_writer.writerow(['SHA1', 'SHA256', 'MD5', 'SCORE'])\n", | |
| " data_writer.writerows(data)\n", | |
| " return True\n", | |
| "\n", | |
| "# ==================================\n", | |
| "# PROGRAM EXECUTION STARTS HERE\n", | |
| "# ==================================\n", | |
| "\n", | |
| "# CREATE \"HASH_CHECK\" OBJECT CLASS\n", | |
| "hc = Hash_Check(read_file_path='hashes.csv', write_file_path='hashes_output.csv', api_key='your_api_key', api_url='https://www.virustotal.com/vtapi/v2/file/report')\n", | |
| "\n", | |
| "# INVOKE \"read_csv\" METHOD TO READ HASHES FROM CSV\n", | |
| "hashes_list = hc.read_csv()\n", | |
| "\n", | |
| "# INVOKE \"check_vt\" METHOD TO FETCH HASHES INFORMATION FROM VIRUSTOTAL\n", | |
| "hash_result = hc.check_vt(hashes_list=hashes_list)\n", | |
| "\n", | |
| "# INVOKE \"write_csv\" METHOD TO STORE HASH RESULT TO CSV \n", | |
| "confirm = hc.write_csv(data=hash_result)\n", | |
| "if confirm is True: print('File Created')\n", | |
| "else: print('Not Able to Create a File')" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "<b>Learn more Real Time Scenarios related to csv - <a href=\"https://cybersecpy.in/handle-csv-files-using-python/\">cybersecpy</a></b>" | |
| ] | |
| } | |
| ], | |
| "metadata": { | |
| "kernelspec": { | |
| "display_name": "Python 3", | |
| "language": "python", | |
| "name": "python3" | |
| }, | |
| "language_info": { | |
| "codemirror_mode": { | |
| "name": "ipython", | |
| "version": 3 | |
| }, | |
| "file_extension": ".py", | |
| "mimetype": "text/x-python", | |
| "name": "python", | |
| "nbconvert_exporter": "python", | |
| "pygments_lexer": "ipython3", | |
| "version": "3.8.1" | |
| } | |
| }, | |
| "nbformat": 4, | |
| "nbformat_minor": 4 | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment