香港云主机最佳企业级服务商!

ADSL拨号VPS包含了中国大陆(联通,移动,电信,)

中国香港,国外拨号VPS。

当前位置:云主机 > python >

电信ADSL拨号VPS
联通ADSL拨号VPS
移动ADSL拨号VPS

python实现封装得到virustotal扫描结果


时间:2021-11-09 10:24 作者:admin610456


本文实例讲述了python/' target='_blank'>python实现封装得到virustotal扫描结果的方法。分享给大家供大家参考。具体方法如下:

import simplejson import urllib import urllib2 import os, sys import logging  try:   import sqlite3 except ImportError:   sys.stderr.write("ERROR: Unable to locate Python SQLite3 module. " \            "Please verify your installation. Exiting...\n")   sys.exit(-1)    MD5 = "5248f774d2ee0a10936d0b1dc89107f1" MD5 = "12fa5fb74201d9b6a14f63fbf9a81ff6" #do not have report on virustotal.com        APIKEY = "xxxxxxxxxxxxxxxxxx"用自己的 class VirusTotalDatabase:   """   Database abstraction layer.   """   def __init__(self, db_file):     log = logging.getLogger("Database.Init")     self.__dbfile = db_file     self._conn = None     self._cursor = None      # Check if SQLite database already exists. If it doesn't exist I invoke     # the generation procedure.     if not os.path.exists(self.__dbfile):       if self._generate():         print("Generated database \"%s\" which didn't" \              " exist before." % self.__dbfile)       else:         print("Unable to generate database")      # Once the database is generated of it already has been, I can     # initialize the connection.     try:       self._conn = sqlite3.connect(self.__dbfile)       self._cursor = self._conn.cursor()     except Exception, why:       print("Unable to connect to database \"%s\": %s."            % (self.__dbfile, why))      log.debug("Connected to SQLite database \"%s\"." % self.__dbfile)    def _generate(self):     """     Creates database structure in a SQLite file.     """     if os.path.exists(self.__dbfile):       return False      db_dir = os.path.dirname(self.__dbfile)     if not os.path.exists(db_dir):       try:         os.makedirs(db_dir)       except (IOError, os.error), why:         print("Something went wrong while creating database " \              "directory \"%s\": %s" % (db_dir, why))         return False      conn = sqlite3.connect(self.__dbfile)     cursor = conn.cursor()      cursor.execute("CREATE TABLE virustotal (\n"              \             " id INTEGER PRIMARY KEY,\n"            \             " md5 TEXT NOT NULL,\n"           \             " Kaspersky TEXT DEFAULT NULL,\n"               \             " McAfee TEXT DEFAULT NULL,\n"            \             " Symantec TEXT DEFAULT NULL,\n"             \             " Norman TEXT DEFAULT NULL,\n"             \             " Avast TEXT DEFAULT NULL,\n"            \             " NOD32 TEXT DEFAULT NULL,\n"         \             " BitDefender TEXT DEFAULT NULL,\n"            \             " Microsoft TEXT DEFAULT NULL,\n"            \             " Rising TEXT DEFAULT NULL,\n"           \             " Panda TEXT DEFAULT NULL\n"           \             ");")     print "create db:%s sucess" % self.__dbfile      return True    def _get_task_dict(self, row):     try:       task = {}       task["id"] = row[0]       task["md5"] = row[1]       task["Kaspersky"] = row[2]       task["McAfee"] = row[3]       task["Symantec"] = row[4]       task["Norman"] = row[5]       task["Avast"] = row[6]       task["NOD32"] = row[7]       task["BitDefender"] = row[8]       task["Microsoft"] = row[9]       task["Rising"] = row[10]       task["Panda"] = row[11]       return task     except Exception, why:       return None    def add_sample(self, md5, virus_dict):     """          """     task_id = None      if not self._cursor:       return None     if not md5 or md5 == "":       return None      Kaspersky = virus_dict.get("Kaspersky", None)     McAfee = virus_dict.get("McAfee", None)     Symantec = virus_dict.get("Symantec", None)     Norman = virus_dict.get("Norman", None)     Avast = virus_dict.get("Avast", None)     NOD32 = virus_dict.get("NOD32", None)     BitDefender = virus_dict.get("BitDefender", None)     Microsoft = virus_dict.get("Microsoft", None)     Rising = virus_dict.get("Rising", None)     Panda = virus_dict.get("Panda", None)          self._conn.text_factory = str     try:       self._cursor.execute("SELECT id FROM virustotal WHERE md5 = ?;",                  (md5,))       sample_row = self._cursor.fetchone()     except sqlite3.OperationalError, why:       print "sqlite3 error:%s\n" % str(why)       return False          if sample_row:       try:         sample_row = sample_row[0]         self._cursor.execute("UPDATE virustotal SET Kaspersky=?, McAfee=?, Symantec=?, Norman=?, Avast=?, \                    NOD32=?, BitDefender=?, Microsoft=?, Rising=?, Panda=?  WHERE id = ?;",                    (Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender, Microsoft,\                    Rising, Panda, sample_row))         self._conn.commit()         task_id = sample_row       except sqlite3.OperationalError, why:         print("Unable to update database: %s." % why)         return False     else: #the sample not in the database       try:         self._cursor.execute("INSERT INTO virustotal " \                    "(md5, Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender,\                     Microsoft, Rising, Panda) " \                    "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",                    (md5, Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender,\                     Microsoft, Rising, Panda))         self._conn.commit()         task_id = self._cursor.lastrowid       except sqlite3.OperationalError, why:         print "why",str(why)         return None       print "add_to_db:%s, task_id:%s" % (str(self.__dbfile), str(task_id))     return task_id    def get_sample(self):     """     Gets a task from pending queue.     """     log = logging.getLogger("Database.GetTask")      if not self._cursor:       log.error("Unable to acquire cursor.")       return None      # Select one item from the queue table with higher priority and older     # addition date which has not already been processed.     try:           self._cursor.execute("SELECT * FROM virustotal " \                  #"WHERE lock = 0 " \                  #"AND status = 0 " \                  "ORDER BY id, added_on LIMIT 1;")     except sqlite3.OperationalError, why:       log.error("Unable to query database: %s." % why)       return None      sample_row = self._cursor.fetchone()      if sample_row:       return self._get_task_dict(sample_row)     else:       return None    def search_md5(self, md5):     """         """     if not self._cursor:       return None      if not md5 or len(md5) != 32:       return None      try:       self._cursor.execute("SELECT * FROM virustotal " \                  "WHERE md5 = ? " \                  #"AND status = 1 " \                  "ORDER BY id DESC;",                  (md5,))     except sqlite3.OperationalError, why:       return None      task_dict = {}     for row in self._cursor.fetchall():       task_dict = self._get_task_dict(row)       #if task_dict:         #tasks.append(task_dict)      return task_dict      class VirusTotal:   """"""    def __init__(self, md5):     """Constructor"""     self._virus_dict = {}     self._md5 = md5     self._db_file = r"./db/virustotal.db"     self.get_report_dict()        def repr(self):     return str(self._virus_dict)      def submit_md5(self, file_path):     import postfile                                          #submit the file     FILE_NAME = os.path.basename(file_path)                                                                       host = "www.virustotal.com"                                    selector = "https://www.virustotal.com/vtapi/v2/file/scan"                     fields = [("apikey", APIKEY)]     file_to_send = open(file_path, "rb").read()                            files = [("file", FILE_NAME, file_to_send)]                            json = postfile.post_multipart(host, selector, fields, files)                   print json     pass      def get_report_dict(self):     result_dict = {}          url = "https://www.virustotal.com/vtapi/v2/file/report"     parameters = {"resource": self._md5,             "apikey": APIKEY}     data = urllib.urlencode(parameters)     req = urllib2.Request(url, data)     response = urllib2.urlopen(req)     json = response.read()          response_dict = simplejson.loads(json)     if response_dict["response_code"]: #has result        scans_dict = response_dict.get("scans", {})       for anti_virus_comany, virus_name in scans_dict.iteritems():         if virus_name["detected"]:           result_dict.setdefault(anti_virus_comany, virus_name["result"])     return result_dict      def write_to_db(self):     """"""     db = VirusTotalDatabase(self._db_file)     virus_dict = self.get_report_dict()     db.add_sample(self._md5, virus_dict) 

使用方法如下:

config = {'input':"inputMd5s"} fp = open(config['input'], "r") content = fp.readlines() MD5S = [] for md5 in ifilter(lambda x:len(x)>0, imap(string.strip, content)):   MD5S.append(md5)   print "MD5S",MD5S fp.close()   from getVirusTotalInfo import VirusTotal #得到扫描结果并写入数库 for md5 in MD5S:   virus_total = VirusTotal(md5)   virus_total.write_to_db() 

希望本文所述对大家的Python程序设计有所帮助。

(责任编辑:admin)






帮助中心
会员注册
找回密码
新闻中心
快捷通道
域名登录面板
虚机登录面板
云主机登录面板
关于我们
关于我们
联系我们
联系方式

售前咨询:17830004266(重庆移动)

企业QQ:383546523

《中华人民共和国工业和信息化部》 编号:ICP备00012341号

Copyright © 2002 -2018 香港云主机 版权所有
声明:香港云主机品牌标志、品牌吉祥物均已注册商标,版权所有,窃用必究

云官方微信

在线客服

  • 企业QQ: 点击这里给我发消息
  • 技术支持:383546523

  • 公司总台电话:17830004266(重庆移动)
  • 售前咨询热线:17830004266(重庆移动)