Last active
November 16, 2019 00:09
-
-
Save remingtonc/cce84c0508f1834989881a7000f43796 to your computer and use it in GitHub Desktop.
Download MIBs from Cisco FTP server and parse into JSON using pysmi
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import re | |
| import os | |
| import sys | |
| import logging | |
| from io import StringIO | |
| from ftplib import FTP | |
| from pysmi.reader import FileReader | |
| from pysmi.searcher import AnyFileSearcher, StubSearcher | |
| from pysmi.writer import FileWriter | |
| from pysmi.parser import SmiV1CompatParser | |
| from pysmi.codegen import JsonCodeGen | |
| from pysmi.compiler import MibCompiler | |
| from pysmi import error | |
| from pysmi import debug | |
| class SNMPPopulator: | |
| protocol = None | |
| host = None | |
| base_path = None | |
| local_mib_dir = None | |
| local_json_dir = None | |
| def __init__(self, protocol='ftp', host='ftp.cisco.com', base_path='pub/mibs/v2', | |
| local_mib_dir='mibs/', local_json_dir='json/', | |
| abs_local_dir=False | |
| ): | |
| self.protocol = protocol | |
| self.host = host | |
| self.base_path = base_path | |
| local_dir = os.path.dirname(os.path.abspath(__file__)) | |
| if not abs_local_dir: | |
| self.local_mib_dir = os.path.join(local_dir, local_mib_dir) | |
| self.local_json_dir = os.path.join(local_dir, local_json_dir) | |
| else: | |
| self.local_mib_dir = local_mib_dir | |
| self.local_json_dir = local_json_dir | |
| def gen_full_hostpath(self): | |
| return '{0}://{1}/{2}'.format(self.protocol, self.host, self.base_path) | |
| def download_mibs(self, specific_mibs=None, exclude_mibs=['test-mib.my'], refresh=False): | |
| print( | |
| 'Downloading MIBs from {0}/{1} to {2}'.format( | |
| self.host, self.base_path, self.local_mib_dir | |
| ) | |
| ) | |
| with FTP(self.host) as ftp: | |
| ftp.login() | |
| mib_filenames = [] | |
| if specific_mibs is None: | |
| ftp.retrlines( | |
| 'NLST {0}/*'.format(self.base_path), | |
| callback=lambda filename: mib_filenames.append(filename) | |
| ) | |
| else: | |
| mib_filenames = specific_mibs.copy() | |
| num_mibs = len(mib_filenames) | |
| print('{0} MIBs to download.'.format(str(num_mibs))) | |
| if not os.path.isdir(self.local_mib_dir): | |
| os.makedirs(self.local_mib_dir) | |
| counter = 1 | |
| for filename in mib_filenames: | |
| try: filename.rindex('.my', -3) | |
| except: continue | |
| print('MIB {0}/{1}: {2}'.format(counter, num_mibs, filename)) | |
| if filename in exclude_mibs: | |
| continue | |
| if not refresh and os.path.isfile(os.path.join(self.local_mib_dir, filename)): | |
| print('Skipped.') | |
| continue | |
| with open(os.path.join(self.local_mib_dir, filename), 'wb') as mib_file: | |
| ftp.retrbinary( | |
| 'RETR {0}/{1}'.format(self.base_path, filename), | |
| callback=lambda data: mib_file.write(data) | |
| ) | |
| print('MIB {0}/{1}: {2}'.format(counter, num_mibs, filename)) | |
| counter += 1 | |
| print('Finished downloading MIBs.') | |
| return True | |
| def transform_mibs_to_json(self, specific_mibs=None, exclude_mibs=['test-mib.my']): | |
| mib_names = [] | |
| if specific_mibs is None: | |
| for filename in os.listdir(self.local_mib_dir): | |
| try: filename.rindex('.my', -3) | |
| except: continue | |
| mib_names.append(filename) | |
| else: | |
| mib_names = specific_mibs.copy() | |
| try: | |
| mib_compiler = MibCompiler( | |
| SmiV1CompatParser(), | |
| JsonCodeGen(), | |
| FileWriter(self.local_json_dir).setOptions(suffix='.json') | |
| ) | |
| mib_compiler.addSources(FileReader(self.local_mib_dir, recursive=True)) | |
| mib_stubs = JsonCodeGen.baseMibs | |
| searchers = [AnyFileSearcher(self.local_json_dir).setOptions(exts=['.json']), StubSearcher(*mib_stubs)] | |
| mib_compiler.addSearchers(*searchers) | |
| if not os.path.isdir(self.local_json_dir): | |
| os.makedirs(self.local_json_dir) | |
| processed = mib_compiler.compile( | |
| *mib_names, | |
| **dict( | |
| noDeps=False, | |
| rebuild=True, | |
| genTexts=True, | |
| writeMibs=True, | |
| ignoreErrors=True | |
| ) | |
| ) | |
| mib_compiler.buildIndex( | |
| processed, | |
| ignoreErrors=True | |
| ) | |
| except error.PySmiError: | |
| print('ERROR: {0}'.format(sys.exc_info()[1])) | |
| sys.exit(-1) | |
| if __name__ == '__main__': | |
| #debug.setLogger(debug.Debug('reader', 'searcher', 'compiler')) | |
| snmppop = SNMPPopulator() | |
| snmppop.download_mibs() | |
| snmppop.transform_mibs_to_json() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment