source: genesis/tools/get-link-traffic.py@ 9988

Last change on this file since 9988 was 9988, checked in by rick, 13 years ago

Use multiprocessing to speedup discovery and running by 10x

  • Property svn:executable set to *
File size: 3.9 KB
Line 
1#!/usr/bin/env python
2#
3# XXX: Parsing snmpwalk is soo wrong todo, use a proper python library.
4#
5# Rick van der Zwet <info@rickvanderzwet.nl>
6#
7import glob
8import logging
9import subprocess
10import sys
11import yaml
12from multiprocessing import Process, Manager, Pool, Queue, freeze_support
13
14logging.basicConfig(level=logging.DEBUG)
15logger = logging.getLogger()
16
17
18
19DATASTORE='store.yaml'
20
21class ConnectError(Exception):
22 pass
23
24def get_snmp_stats(logger, ip,target):
25 p = subprocess.Popen("snmpwalk -t 1 -r 1 -Oq -c public -v2c %s %s" % (ip, target),
26 shell=True,
27 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
28 (stdout, stderr) = p.communicate()
29 if p.returncode != 0:
30 logger.error(stderr.strip())
31 raise ConnectError
32 r = {}
33 for line in stdout.strip().split('\n'):
34 index = line.split()[0][len(target)+1:]
35 value = line.split()[1]
36 r[index] = value
37 return r
38
39def get_snmp_value(logger, ip, target):
40 p = subprocess.Popen("snmpget -t 1 -r 1 -Ovt -c public -v2c %s %s" % (ip, target),
41 shell=True,
42 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
43 (stdout, stderr) = p.communicate()
44 if p.returncode != 0:
45 logger.error(stderr.strip())
46 raise ConnectError
47 return stdout.strip()
48
49def find_right_snmp_ip(logger, data):
50 for k,v in data.iteritems():
51 if k.startswith('iface_'):
52 ip = v['ip'].split('/')[0]
53 logger.info("Trying ip %s", ip)
54 try:
55 uptime = get_snmp_value(logger, ip, 'DISMAN-EVENT-MIB::sysUpTimeInstance')
56 return ip
57 except ConnectError:
58 pass
59 return None
60
61
62
63try:
64 ff = sys.argv[1]
65except IndexError:
66 ff = ''
67
68def process_file(logger, store, nf, rescan):
69 data = yaml.load(open(nf,'r'))
70 nodename = data['nodename']
71
72 if store['snmp'].has_key(nodename):
73 ip = store['snmp'][nodename]
74 if not ip and rescan:
75 logger.info("Re-scanning for new valid IP")
76 ip = find_right_snmp_ip(logger, data)
77 store['snmp'][nodename] = ip
78 else:
79 try:
80 uptime = get_snmp_value(logger, ip, 'DISMAN-EVENT-MIB::sysUpTimeInstance')
81 except ConnectError:
82 logger.info("Re-scanning for new valid IP")
83 ip = find_right_snmp_ip(logger, data)
84 store['snmp'][nodename] = ip
85
86 else:
87 logger.info("Running discovery for %s", nodename)
88 ip = find_right_snmp_ip(logger, data)
89 store['snmp'][nodename] = ip
90
91 if ip == None:
92 logger.error("No valid ip found for node %s", nodename)
93 return
94
95 logger.info("Processing %s via %s", nodename, ip)
96 target = 'IF-MIB::ifDescr'
97
98 try:
99 iface = get_snmp_stats(logger, ip, 'IF-MIB::ifDescr')
100 ifout = get_snmp_stats(logger, ip, 'IF-MIB::ifOutOctets')
101 ifin = get_snmp_stats(logger, ip, 'IF-MIB::ifInOctets')
102
103 uptime = get_snmp_value(logger, ip, 'DISMAN-EVENT-MIB::sysUpTimeInstance')
104 store['uptime'][nodename] = int(uptime)
105
106 traffic = {}
107 for i,f in iface.iteritems():
108 traffic[f] = (int(ifin[i]), int(ifout[i]))
109 store['traffic'][nodename] = traffic
110 except ConnectError:
111 logger.error("Unable to get all data")
112 pass
113
114def worker(i, input, m_store):
115 logger = logging.getLogger('Worker%s' % i)
116 logger.info("Worker")
117 for (nf, rescan) in iter(input.get, 'STOP'):
118 process_file(logger, m_store, nf, rescan)
119 logger.info("END")
120
121if __name__ == '__main__':
122 freeze_support()
123 task_queue = Queue()
124 manager = Manager()
125
126 try:
127 store = yaml.load(open(DATASTORE,'r'))
128 except IOError:
129 store = { 'snmp' : {}, 'traffic' : {}, 'uptime' : {}}
130 pass
131 m_store = manager.dict(store)
132
133 NUMBER_OF_PROCESSES = 10
134 RESCAN = True
135 plist = {}
136 for i in range(NUMBER_OF_PROCESSES):
137 plist[i] = Process(target=worker, args=(i, task_queue,m_store))
138 plist[i].start()
139
140 for nf in sorted(glob.glob('nodes/*%s*/wleiden.yaml' % ff)):
141 task_queue.put((nf, RESCAN))
142
143 for i in range(NUMBER_OF_PROCESSES):
144 task_queue.put('STOP')
145
146 for i in range(NUMBER_OF_PROCESSES):
147 plist[i].join()
148
149
150 yaml.dump(store,open(DATASTORE,'w'))
151
152
153
Note: See TracBrowser for help on using the repository browser.