Changeset 9988 in genesis for tools


Ignore:
Timestamp:
Feb 18, 2012, 8:56:43 PM (13 years ago)
Author:
rick
Message:

Use multiprocessing to speedup discovery and running by 10x

File:
1 edited

Legend:

Unmodified
Added
Removed
  • tools/get-link-traffic.py

    r9987 r9988  
    1010import sys
    1111import yaml
     12from multiprocessing import Process, Manager, Pool, Queue, freeze_support
     13
    1214logging.basicConfig(level=logging.DEBUG)
    1315logger = logging.getLogger()
    1416
     17
     18
    1519DATASTORE='store.yaml'
    16 
    17 try:
    18   store = yaml.load(open(DATASTORE,'r'))
    19 except IOError:
    20   store = { 'snmp' : {}, 'traffic' : {}, 'uptime' : {}}
    21   pass
    2220
    2321class ConnectError(Exception):
    2422  pass
    2523
    26 def find_right_snmp_ip(data):
    27   for k,v in data.iteritems():
    28     if k.startswith('iface_'):
    29       ip = v['ip'].split('/')[0]
    30       logger.info("Trying ip %s", ip)
    31       p = subprocess.Popen("snmpwalk -t 1 -r 1 -Oq -c public -v2c %s uptime" % ip,
    32         shell=True,
    33         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    34       (stdout, stderr) = p.communicate()
    35       if p.returncode == 0:
    36         return ip
    37       else:
    38         logger.error(stderr.strip())
    39   raise None
    40 
    41 def get_snmp_stats(ip,target):
     24def get_snmp_stats(logger, ip,target):
    4225  p = subprocess.Popen("snmpwalk -t 1 -r 1 -Oq -c public -v2c %s %s" % (ip, target),
    4326    shell=True,
     
    5437  return r
    5538
    56 def get_snmp_value(ip, target):
     39def get_snmp_value(logger, ip, target):
    5740  p = subprocess.Popen("snmpget -t 1 -r 1 -Ovt -c public -v2c %s %s" % (ip, target),
    5841    shell=True,
     
    6447  return stdout.strip()
    6548
     49def find_right_snmp_ip(logger, data):
     50  for k,v in data.iteritems():
     51    if k.startswith('iface_'):
     52      ip = v['ip'].split('/')[0]
     53      logger.info("Trying ip %s", ip)
     54      try:
     55        uptime = get_snmp_value(logger, ip, 'DISMAN-EVENT-MIB::sysUpTimeInstance')
     56        return ip
     57      except ConnectError:
     58        pass
     59  return None
     60
    6661   
    6762
     
    7166  ff = ''
    7267
    73 try:
     68def process_file(logger, store, nf, rescan):
     69  data = yaml.load(open(nf,'r'))
     70  nodename = data['nodename']
     71
     72  if store['snmp'].has_key(nodename):
     73    ip = store['snmp'][nodename]
     74    if not ip and rescan:
     75      logger.info("Re-scanning for new valid IP")
     76      ip = find_right_snmp_ip(logger, data)
     77      store['snmp'][nodename] = ip
     78    else:
     79      try:
     80        uptime = get_snmp_value(logger, ip, 'DISMAN-EVENT-MIB::sysUpTimeInstance')
     81      except ConnectError:
     82        logger.info("Re-scanning for new valid IP")
     83        ip = find_right_snmp_ip(logger, data)
     84        store['snmp'][nodename] = ip
     85     
     86  else:
     87    logger.info("Running discovery for %s", nodename)
     88    ip = find_right_snmp_ip(logger, data)
     89    store['snmp'][nodename] = ip
     90
     91  if ip == None:
     92    logger.error("No valid ip found for node %s", nodename)
     93    return
     94 
     95  logger.info("Processing %s via %s", nodename, ip)
     96  target = 'IF-MIB::ifDescr'
     97 
     98  try:
     99    iface = get_snmp_stats(logger, ip, 'IF-MIB::ifDescr')
     100    ifout = get_snmp_stats(logger, ip, 'IF-MIB::ifOutOctets')
     101    ifin = get_snmp_stats(logger, ip, 'IF-MIB::ifInOctets')
     102
     103    uptime = get_snmp_value(logger, ip, 'DISMAN-EVENT-MIB::sysUpTimeInstance')
     104    store['uptime'][nodename] = int(uptime)
     105   
     106    traffic = {}
     107    for i,f in iface.iteritems():
     108      traffic[f] = (int(ifin[i]), int(ifout[i]))
     109    store['traffic'][nodename] = traffic
     110  except ConnectError:
     111    logger.error("Unable to get all data")
     112    pass
     113
     114def worker(i, input, m_store):
     115  logger = logging.getLogger('Worker%s' % i)
     116  logger.info("Worker")
     117  for (nf, rescan) in iter(input.get, 'STOP'):
     118    process_file(logger, m_store, nf, rescan)
     119  logger.info("END")
     120
     121if __name__ == '__main__':
     122  freeze_support()
     123  task_queue = Queue()
     124  manager = Manager()
     125
     126  try:
     127    store = yaml.load(open(DATASTORE,'r'))
     128  except IOError:
     129    store = { 'snmp' : {}, 'traffic' : {}, 'uptime' : {}}
     130    pass
     131  m_store = manager.dict(store)
     132
     133  NUMBER_OF_PROCESSES = 10
     134  RESCAN = True
     135  plist = {}
     136  for i in range(NUMBER_OF_PROCESSES):
     137    plist[i] = Process(target=worker, args=(i, task_queue,m_store))
     138    plist[i].start()
     139
    74140  for nf in sorted(glob.glob('nodes/*%s*/wleiden.yaml' % ff)):
    75     data = yaml.load(open(nf,'r'))
    76     nodename = data['nodename']
    77    
    78     if store['snmp'].has_key(nodename):
    79       ip = store['snmp'][nodename]
    80     else:
    81       logger.info("Running discovery for %s", nodename)
    82       ip = find_right_snmp_ip(data)
    83       store['snmp'][nodename] = ip
     141    task_queue.put((nf, RESCAN))
     142 
     143  for i in range(NUMBER_OF_PROCESSES):
     144    task_queue.put('STOP')
    84145
    85     if ip == None:
    86       logger.error("No valid ip found for node %s", nodename)
    87       continue
    88    
    89     logger.info("Processing %s via %s", nodename, ip)
    90     target = 'IF-MIB::ifDescr'
    91    
    92     try:
    93       iface = get_snmp_stats(ip, 'IF-MIB::ifDescr')
    94       ifout = get_snmp_stats(ip, 'IF-MIB::ifOutOctets')
    95       ifin = get_snmp_stats(ip, 'IF-MIB::ifInOctets')
    96 
    97       uptime = get_snmp_value(ip, 'DISMAN-EVENT-MIB::sysUpTimeInstance')
    98       store['uptime'][nodename] = int(uptime)
    99      
    100       traffic = {}
    101       for i,f in iface.iteritems():
    102         traffic[f] = (int(ifin[i]), int(ifout[i]))
    103       store['traffic'][nodename] = traffic
    104     except ConnectError:
    105       logger.error("Unable to get all data")
    106       pass
    107 except KeyboardInterrupt:
    108   pass
    109 
    110 yaml.dump(store,open(DATASTORE,'w'))
     146  for i in range(NUMBER_OF_PROCESSES):
     147    plist[i].join()
    111148
    112149
     150  yaml.dump(store,open(DATASTORE,'w'))
     151
     152
     153
Note: See TracChangeset for help on using the changeset viewer.