Changeset 9557


Ignore:
Timestamp:
Aug 25, 2011, 10:24:36 AM (13 years ago)
Author:
rick
Message:

Re-did importing to make it roughly 100x times faster, by using proper database imports.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • src/django_gheat/gheat/management/commands/import_droidstumbler.py

    r9552 r9557  
    1313from django.db import connection, transaction
    1414from django.db.utils import IntegrityError
     15from _mysql_exceptions import OperationalError
    1516from optparse import OptionParser, make_option
    1617from gheat.models import *
     
    2223import sys
    2324
    24 def user_feedback(output, count, dot_step, summary_report):
    25   if (count % summary_report) == 0:
    26     output.write(str(count))
    27   elif (count % dot_step) == 0:
    28     output.write(".")
    29   output.flush()
    30   return count + 1
     25logger = logging.getLogger(__name__)
     26logger.setLevel(logging.DEBUG)
     27
     28def bulk_sql(sql_table, sql_values):
     29  if len(sql_values) == 0:
     30    raise ValueError, "No data to import"
     31
     32  cursor = connection.cursor()
     33  try:
     34    sql = "INSERT INTO %s VALUES %s" % (sql_table, ','.join(sql_values))
     35    count = cursor.execute(sql)
     36    transaction.commit_unless_managed()
     37  except OperationalError, e:
     38    logger.error("%s - %s ", sql_table, sql_values[0])
     39    raise
     40  except IntegrityError, e:
     41    logger.error("Unable to import - %s" %  e)
     42  return count
    3143
    3244
    33 def import_droidstumbler(location, meetrondje, gebruiker, kaart, email, datum,show_progres=False,output=sys.stdout,bulk_import=True):
     45def import_droidstumbler(filename, meetrondje):
    3446  """ Import all points, return tuple with summary"""
    35   g, created = Gebruiker.objects.get_or_create(naam=gebruiker , email=email)
    36   a, created = Apparatuur.objects.get_or_create(kaart=kaart)
    37   mr, created = MeetRondje.objects.get_or_create(datum=datum , naam=meetrondje , gebruiker=g , apparatuur=a)
    38   if not created:
    39     logging.error("Meetrondje '%s' already imported" % mr)
    40     sys.exit(1)
    4147
    42   meting_count = 0
    43   new_ap_count = 0
    44   ap_cache = {}
     48  # Open file for reading
     49  if filename.endswith('.gz'):
     50    fh = gzip.open(filename,'rb')
     51  else:
     52    fh = open(filename,'rb')
     53  csvfile = csv.reader(fh, delimiter=',')
     54
     55  #Various statistics
     56  counters = {'meting_added' : 0, 'meting_total' : 0, 'ap_added' : 0, 'ap_total' : 0}
     57
     58  # Temponary holders
    4559  meting_pool = {}
    46 
    47   if show_progres: output.write('#INFO: Pre-Processing file: ')
    48   if location.endswith('.gz'):
    49     fh = gzip.open(location,'rb')
    50   else:
    51     fh = open(location,'rb')
    52   csvfile = csv.reader(fh, delimiter=',')
    53   count = 0
     60  ap_pool = {}
     61  # Process file, preparing new access points and measurements
    5462  for row in csvfile:
    55     if show_progres: count = user_feedback(output, count, 100, 1000)
    5663    try:
    5764      epoch, msg_type, lat, lon, accuracy, ssid, bssid, level, frequency, capabilities = row
    5865    except ValueError:
    59       logging.error("Unable to parse line:%i '%s'" % (csvfile.line_num, row))
     66      # Known error, please ignore
     67      if row[1] == 'gps' and len(row) == 12: continue
     68      logger.error("Unable to parse line:%i '%s'" % (csvfile.line_num, row))
    6069      continue
    6170    if msg_type == "data" and lat and lon:
    62       if not ap_cache.has_key(bssid):
    63         ap_cache[bssid], created = Accespoint.objects.get_or_create(mac=bssid, ssid=ssid, encryptie=capabilities)
    64         if created: new_ap_count += 1
     71      counters['meting_total'] += 1
     72      if not ap_pool.has_key(bssid):
     73        encryption = 'WPA' in capabilities or 'WEP' in capabilities
     74        ap_pool[bssid] = (ssid, encryption)
    6575
    6676      # We store the best value found
    67       key = (ap_cache[bssid], lat, lon)
     77      key = (bssid, lat, lon)
    6878      signaal=(100 + int(level))
    6979      if meting_pool.has_key(key):
     
    7181      else:
    7282        meting_pool[key] = signaal
    73   if show_progres: output.write("%s\n" % count)
    7483
    7584
    76   if show_progres: output.write('#INFO: Importing the data into the database: ')
    77   meting_count = 0
    78   cursor = connection.cursor()
     85  # Determine which entries we need to add
     86  counters['ap_total'] = len(ap_pool)
     87  bssid_list_present = Accespoint.objects.filter(mac__in=ap_pool.keys()).values_list('mac', flat=True)
     88  bssid_list_insert = set(ap_pool.keys()) - set(bssid_list_present)
     89
     90  # Create a bulk import list and import
     91  if bssid_list_insert:
     92    sql_values = []
     93    for bssid in bssid_list_insert:
     94      ssid, encryption = ap_pool[bssid]
     95      # Special trick in SSID ts avoid escaping in later stage
     96      value_str = str((bssid,ssid.replace('%','%%'),encryption))
     97      sql_values.append(value_str)
     98  counters['ap_added'] = bulk_sql('gheat_accespoint (`mac`, `ssid`, `encryptie`)',sql_values)
     99
     100  # Build mapping for meting import
     101  mac2id = {}
     102  for mac,id in Accespoint.objects.filter(mac__in=ap_pool.keys()).values_list('mac','id'):
     103    mac2id[mac] = int(id)
     104
    79105  sql_values = []
    80   for (ap,lat,lon),signal in meting_pool.iteritems():
    81     try:
    82       if bulk_import:
    83         value_str = str((int(mr.id),int(ap.id),float(lat),float(lon),int(signaal)))
    84         sql_values.append(value_str)
    85       else:
    86         m = Meting.objects.create(meetrondje=mr, accespoint=ap, latitude=lat, longitude=lon, signaal=signaal)
    87     except IntegrityError, e:
    88       logging.error("Unable to import - %s" %  e)
    89       continue
    90 
    91     # Give some feedback to the user
    92     if show_progres: meting_count = user_feedback(output, meting_count, 100, 1000)
    93 
    94   # Make sure to include closing newline
    95   if show_progres: output.write("%s\n" % meting_count)
     106  for (bssid,lat,lon),signal in meting_pool.iteritems():
     107    value_str = str((int(meetrondje.id),mac2id[bssid],float(lat),float(lon),int(signaal)))
     108    sql_values.append(value_str)
    96109
    97110  # Bulk Import data if possible
    98   if bulk_import:
    99     if len(sql_values) == 0:
    100       logging.warn("No data to import")
    101     else:
    102       try:
    103         sql = "INSERT INTO gheat_meting (`meetrondje_id`, `accespoint_id`, `lat`, `lng`, `signaal`) VALUES %s" % ','.join(sql_values)
    104         cursor.execute(sql)
    105         transaction.commit_unless_managed()
    106       except IntegrityError, e:
    107         logging.error("Unable to import - %s" %  e)
    108         pass
    109 
    110   return (len(ap_cache), new_ap_count, meting_count, len(meting_pool) - meting_count)
     111  if sql_values:
     112    counters['meting_added'] = bulk_sql('gheat_meting (`meetrondje_id`, `accespoint_id`, `lat`, `lng`, `signaal`)',sql_values)
     113  return counters
    111114
    112115
     
    146149         datum = datetime.datetime.strptime(datum,'%Y-%m-%d-%H%M%S')
    147150      except ValueError:
    148         raise CommandError("Invalid date '%s'\n" % options['datum'])
     151        raise CommandError("Invalid date '%s'" % options['datum'])
    149152
    150       self.stdout.write('#INFO: Meetrondje: %s @ %s\n' % (meetrondje, datum))
    151       self.stdout.write("#INFO: Going to import '%s' for gebruiker '%s <%s>'\n" % (os.path.basename(csv_file), options['gebruiker'], options['email']))
    152       (ap_count, new_ap_count, meting_count, meting_error_count) = import_droidstumbler(csv_file,meetrondje,options['kaart'],options['gebruiker'],options['email'], datum, True)
    153       self.stdout.write("#INFO: Import summary accespoints: added:%s processed:%s\n" % (new_ap_count, ap_count))
    154       self.stdout.write("#INFO: Import summary metingen: added:%s error:%s\n" % (meting_count, meting_error_count))
     153      logger.info("Going to import '%s'" % (os.path.basename(csv_file)))
     154      # Create meetrondje object
     155      g, created = Gebruiker.objects.get_or_create(naam=options['gebruiker'] , email=options['email'])
     156      a, created = Apparatuur.objects.get_or_create(kaart=options['kaart'])
     157      mr, created = MeetRondje.objects.get_or_create(datum=datum , naam=meetrondje , gebruiker=g , apparatuur=a)
     158      logger.info('Meetrondje: %s @ %s' % (meetrondje, datum))
     159      if not created:
     160        logger.error("Meetrondje '%s' already imported" % mr)
     161        sys.exit(1)
     162      counters = import_droidstumbler(csv_file,mr)
     163      logger.info("Summary accespoints: added:%(ap_added)-6s processed:%(ap_total)-6s" % counters)
     164      logger.info("Summary metingen   : added:%(meting_added)-6s processed:%(meting_total)-6s" % counters)
Note: See TracChangeset for help on using the changeset viewer.