GPX解析エンジン test varsion
「GPXログをATGoogleMapsで(GPX解析エンジン作成編) 」 で作成したpython script。
Size 20.7 kB - File type text/python-sourceFile contents
from xml.dom.ext.reader.Sax import FromXmlStream from datetime import tzinfo, timedelta, datetime import time import math import Image import ImageDraw class UTC(tzinfo): """UTC""" def utcoffset(self, dt): return timedelta(0) def tzname(self, dt): return "UTC" def dst(self, dt): return timedelta(0) class JST(tzinfo): """JST""" def utcoffset(self, dt): return timedelta(hours=9) def tzname(self, dt): return "JST" def dst(self, dt): return timedelta(hours=9) ########################################################################## # # *.gpx file Class # ・<trkseg>は無視でfile内にある<trkpt>をいっしょくたに扱ってたりする # ・<trk>が複数出てもいっしょくた(多分) # ・<wpt>は・・・どうしよーか # ########################################################################## class GPXFileData : def __init__(self, debug=0): self.gpx_data = {} self.debug = debug self.item_list = {'trk':['lon', 'lat', 'ele', 'time'],'wpt':['lon', 'lat', 'ele', 'time', 'cmt']} def makeItemListStr( self, list ) : s = "{" for i in list : s += "'%s':''," % i s += "}" return s def getJSTTime( self, s ) : # GPXにある時刻はUTCと決めつけて処理 # UTC -> JST 変換はこの辺を参照してtestしまくった...orz # http://www.python.jp/doc/2.4/lib/datetime-datetime.html # http://www.python.jp/doc/2.4/lib/datetime-tzinfo.html utc = UTC() jst = JST() # YYYY-mm-ddTHH:MM:SSZ ? if len("YYYY-mm-ddTHH:MM:SSZ") != len( s ) : if self.debug : print "time data not found" return None t = time.strptime( s, "%Y-%m-%dT%H:%M:%SZ") ut = datetime( year=t[0],month=t[1],day=t[2], hour=t[3], minute=t[4], second=t[5], tzinfo=utc ) jt = ut.astimezone(jst) if self.debug : print "UTC=%s JST=%s" % (ut,jt) return jt def initTrkItems( self, name ) : s = self.makeItemListStr( self.item_list[name] ) self.gpx_data[name].append(eval(s)) def setTrkItemList( self, name, value ): item = value if name == 'time' : item = self.getJSTTime( value ) now_trkpt = len(self.gpx_data['trk']) - 1 self.gpx_data['trk'][now_trkpt][name] = item if self.debug: print "%s(%d)=%s" % (name,len(self.gpx_data['trk']), item) def setTrkItemValue( self, name, value ): self.gpx_data[name] = value if self.debug: print "%s=%s\n" % (name, value) def getNodeAttr( self, node ): name = node.nodeName attrs = node.attributes if attrs != None and len(attrs) > 0: for a in attrs: if name == 'trkpt' and (a.name == 'lat' or a.name == 'lon'): self.setTrkItemList( a.name, a.value ) if name == 'gpx' and a.name == 'creator': self.setTrkItemValue( a.name, a.value ) def nodeInspect( self, node, pname='' ): name = node.nodeName value = node.nodeValue if name == 'gpx': self.getNodeAttr( node ) elif name == 'trk' : self.gpx_data['trk'] = [] elif name == 'trkpt' : self.initTrkItems( 'trk' ) if pname == "name" : self.setTrkItemValue( pname, value ) elif pname == "trkseg": self.getNodeAttr( node ) elif pname in ('ele','time') : self.setTrkItemList( pname, value ) children = node.childNodes for c in children: self.nodeInspect( c, name ) def checkTrkData( self, trk_data ) : # time check (all none ?) timef = 0 for j in trk_data : if j['time'] != '' : timef = 1 break if timef == 0 : for j in trk_data : del( j['time'] ) elef = 0 for j in trk_data : if j['ele'] != '' : elef = 1 break if elef == 0 : for j in trk_data : del( j['ele'] ) dellist = [] for i in range( len(trk_data) ) : if eval(trk_data[i]['lon']) == 0 : dellist.append(i) elif eval(trk_data[i]['lat']) == 0 : dellist.append(i) elif trk_data[i].has_key('time') : if trk_data[i]['time'] == '' or None : dellist.append(i) elif trk_data[i].has_key('ele') : if trk_data[i]['ele'] == '' : dellist.append(i) dellist.reverse() for i in dellist : del( trk_data[i] ) if self.debug and len(dellist) : print "checkTrkData() : illegal data found" def makeLogData( self, stream ): self.nodeInspect( stream ) # data matching & check if self.gpx_data.has_key('trk') : self.checkTrkData( self.gpx_data['trk'] ) def getLogData( self ) : return self.gpx_data def getGoogleJSList( self ) : gpx_data = self.gpx_data rtn = "" if gpx_data.has_key('trk') : data = gpx_data['trk'] if data[0].has_key('lat') and data[0].has_key('lon') : rtn = "//point count=%d\n" % len(data) for i in data : rtn += "points.push(new GLatLng(%s,%s));\n" % (i['lat'],i['lon']) rtn += "//start point\nwpt_points.push(new GLatLng(%s,%s));\n" % (data[0]['lat'],data[0]['lon']) rtn += "//end point\nwpt_points.push(new GLatLng(%s,%s));\n" % (data[len(data)-1]['lat'],data[len(data)-1]['lon']) return rtn def chgDegreeToRadian( self, data ) : return (data / 360 * 2 * 3.14159265) def getDistance( self, data1, data2 ) : # http://wadati.blog10.fc2.com/blog-entry-345.html a = 6378137 lat_1 = self.chgDegreeToRadian( eval(data1['lat']) ) lat_2 = self.chgDegreeToRadian( eval(data2['lat']) ) lon_1 = self.chgDegreeToRadian( eval(data1['lon']) ) lon_2 = self.chgDegreeToRadian( eval(data2['lon']) ) lat = lat_2 - lat_1 lon = lon_2 - lon_1 lat_x = a * lat * math.cos(lon_1) lon_y = a * lon return math.sqrt( lat_x**2 + lon_y**2 ) def getMaxMinAvgTotal( self, list ) : total = timedelta(0) rtn = {} l_list = [] is_time = 0 print type(list[0]) if type( list[0] ) == type(total) : is_time = 1 else : total = 0.0 for i in list : l_list.append( i ) total += i if is_time == 1 : rtn['min'] = self.gpx_data['trk'][0]['time'] rtn['max'] = self.gpx_data['trk'][len(self.gpx_data['trk'])-1]['time'] else : l_list.sort() l_list.reverse() rtn['max'] = l_list[0] rtn['min'] = l_list[len(l_list)-1] rtn['avg'] = total / len(l_list) rtn['total'] = total return rtn def getRunningData( self ) : d = self.gpx_data['trk'] rtn = {} rtn['dist'] = [] if d[0].has_key('time') : rtn['time'] = [] rtn['speed'] = [] if d[0].has_key('ele') : rtn['ele'] = [] for i in range( 1, len(d) ) : dist = self.getDistance( d[i-1],d[i] ) rtn['dist'].append(dist) if rtn.has_key( 'time' ) : time_dist = d[i]['time'] - d[i-1]['time'] s = dist / time_dist.seconds s = s * 60 * 60 /1000 rtn['speed'].append(s) rtn['time'].append(time_dist) if rtn.has_key( 'ele' ) : ele = eval(d[i]['ele']) if ele < 0 : ele = 0 rtn['ele'].append(ele) rtn['dist_data'] = self.getMaxMinAvgTotal( rtn['dist'] ) if self.debug : print "total distance : %.3fkm" % (rtn['dist_data']['total']/1000) if rtn.has_key( 'time' ) : rtn['time_data'] = self.getMaxMinAvgTotal( rtn['time'] ) if self.debug : print "time : start=%s end=%s total=%s" % (rtn['time_data']['min'],rtn['time_data']['max'],rtn['time_data']['total']) rtn['speed_data'] = self.getMaxMinAvgTotal( rtn['speed'] ) if self.debug : print "speed(max=%.03fkm/h min=%.03fkm/h avg=%.03fkm/h)" % (rtn['speed_data']['max'], rtn['speed_data']['min'],rtn['speed_data']['avg']) if rtn.has_key( 'ele' ) : rtn['ele_data'] = self.getMaxMinAvgTotal( rtn['ele'] ) if self.debug : print "elevation(max=%dm min=%dm avg=%.03fm)" % (rtn['ele_data']['max'], rtn['ele_data']['min'],rtn['ele_data']['avg']) return rtn class SpeedGraph : def __init__(self, fname, mode, size=None, bgcol=None, basecol=None, gcol=None, debug=0): self.offset = 30 self.size = (600,400) self.bgcol = (255,255,255) self.basecol = (237,234,229) self.gcol = (221,0,0) self.lcol = (0,0,0) self.lcol_sub = (224,224,224) self.im = None self.debug = debug self.x_unit = {} self.y_unit = {} self.fname = fname self.mode = mode if size : self.size = size if bgcol : self.bgcol = bgcol if basecol : self.basecol = basecol if gcol : self.gcol = gcol size = (self.size[0]+self.offset*2,self.size[1]+self.offset*2) self.im = Image.new( "RGB", size, self.basecol ) def getFontImages( self ) : imglist = {} t = [] tim = Image.open("font_image.png") size = ( tim.size[0], (tim.size[1] / 16) ) imglist['size'] = size imglist['num'] = [] if self.debug : print "getFontImages : crop unit(x=%d,y=%d)" % (size[0],size[1]) for i in range(0,16) : step = i * size[1] region = tim.crop( ( 0,step,size[0],step+size[1]) ) if i < 10 : imglist['num'].append( region ) else : t.append( region ) imglist['km_h'] = [t[4],t[0],t[1],t[2],t[3],t[5]] imglist['km'] = [t[4],t[0],t[1],t[5]] imglist['m'] = [t[4],t[1],t[5]] return imglist def getGraphBaseSquare( self, offset=0 ) : sq = [] sq.append( (self.offset-offset, self.offset-offset ) ) sq.append( (self.size[0]+self.offset+offset, self.offset-offset ) ) sq.append( (self.size[0]+self.offset+offset, self.size[1]+self.offset+offset) ) sq.append( (self.offset-offset, self.size[1]+self.offset+offset) ) if self.debug : print "getGraphBaseSquare(%d) : %s" % ( offset, sq ) return sq def disctionLineUnit( self, width, minls, maxv, disits ) : # width = グラフエリアの最大幅(px) # minls = ラインの最小間隔(px) # maxv = 距離とかの最大値(メートル) # disits = (list)最大桁リスト(10,100,1000,10000..) これも単位はメートル line_dist = [] rtn_unit = {} f_width = float(width) f_maxv = float(maxv) # graph max width(height) : ? px = max value : 1 px_par_val = f_width / f_maxv lp = f_width / minls # if width % minls : lp += 1 for i in disits : for j in (1,2,5) : line_dist.append( {'lval': i*j, 'ldigit': i, 'lstep': j, 'maxval': i*j*lp} ) for i in line_dist : if i['maxval'] >= f_maxv : rtn_unit = i break # 1value当たりのpx if self.debug : print rtn_unit rtn_unit['px_par_val']= px_par_val # m or km ? if rtn_unit['ldigit'] >= 1000 : rtn_unit['lunit'] = 'km' rtn_unit['ldigit'] = rtn_unit['ldigit'] / 1000 else : rtn_unit['lunit'] = 'm' del(rtn_unit['maxval']) del(rtn_unit['lval']) if self.debug : print rtn_unit return rtn_unit def makeGraphBase(self, dist, speed ) : draw = ImageDraw.Draw(self.im) nfont = self.getFontImages( ) imsize = nfont['size'] # graph base square basebox = self.getGraphBaseSquare( 1 ) draw.polygon( basebox, fill=self.lcol) basebox = self.getGraphBaseSquare( 0 ) draw.polygon( basebox, fill=self.bgcol) org_xy = ( basebox[0][0], basebox[0][1] ) max_xy = ( basebox[2][0], basebox[2][1] ) ####################################################################### # dist(m) line(max ...10000kmは対応してると思う) # 最小ライン幅決定(=fontの縦サイズ×千桁(4)) # default = 6px * 4 = 24px min_ls_x = imsize[0] * 4 + 8 min_ls_x += 8 self.x_unit = self.disctionLineUnit( self.size[0], min_ls_x, dist, (10,100,1000,10000,100000) ) str_int = 0 px = 0 unit_offset = 1000 if self.x_unit['lunit'] == 'm' : unit_offset = 1 while px < self.size[0] : x = eval("%d" % (org_xy[0] + (px+0.5)) ) draw.line( [(x,org_xy[1]),(x,max_xy[1])], fill=self.lcol_sub ) fontstr = "%s" % str_int for i in range(0,len(fontstr)) : if self.debug : print "%s %d" % (fontstr,eval(fontstr[i])) box = (x+(i*imsize[0]), max_xy[1]+3, x+(i*imsize[0])+imsize[0], max_xy[1]+3+imsize[1]) self.im.paste( nfont['num'][eval(fontstr[i])], box ) str_int = str_int + (self.x_unit['ldigit'] * self.x_unit['lstep']) px = str_int * self.x_unit['px_par_val'] * unit_offset # put unit string [m] or [km] f_org_x = max_xy[0] + 3 f_org_y = max_xy[1] + 3 + imsize[1] for i in range( 0,len( nfont[self.x_unit['lunit']] ) ) : box = (f_org_x + (i*imsize[0]), f_org_y, f_org_x + (i*imsize[0]) + imsize[0], f_org_y + imsize[1]) self.im.paste( nfont[self.x_unit['lunit']][i], box ) ####################################################################### # km/h or 高度(m) line(max ...10000km/hは対応してると思う) # 最小ライン幅決定(=fontの縦サイズ) # default = 8px min_ls_y = imsize[1] min_ls_y += 13 if self.mode == "speed" : if self.debug : print "mode speed graph" # 速度は%.fkm/hなので小数点以下丸め込みの上、メートルに変換して渡す self.y_unit = self.disctionLineUnit( self.size[1], min_ls_y, eval("%d" % speed) * 1000, (10000,100000) ) else : if self.debug : print "mode elevation graph" # 高度はmなのでそのまま self.y_unit = self.disctionLineUnit( self.size[1], min_ls_y, speed, (10,100,1000,10000,100000) ) str_int = 0 px = 0 unit_offset = 1000 if self.y_unit['lunit'] == 'm' : unit_offset = 1 while px < self.size[1] : y = eval("%d" % (max_xy[1] - px) ) draw.line( [(org_xy[0],y),(max_xy[0],y)],fill=self.lcol) fontstr = "%s" % str_int if self.debug : print len(fontstr) for i in range(0,len(fontstr)) : offset = len(fontstr) - i - 1 if self.debug : print "%s %d" % (fontstr,eval(fontstr[i])) f_org_x = org_xy[0] - 3 - offset*imsize[0] - imsize[0] f_org_y = y - imsize[1] box = ( f_org_x, f_org_y, f_org_x + imsize[0], f_org_y + imsize[1] ) self.im.paste( nfont['num'][eval(fontstr[i])], box ) str_int = str_int + (self.y_unit['ldigit'] * self.y_unit['lstep']) px = str_int * self.y_unit['px_par_val'] * unit_offset # put unit string [km/h] f_org_x = org_xy[0] - imsize[0]*3 - 3 f_org_y = org_xy[1] - 3 - (imsize[1]*2) if self.mode == "speed" : st_font = nfont['km_h'] else : st_font = nfont[self.y_unit['lunit']] for i in range( 0,len( st_font ) ) : box = (f_org_x + (i*imsize[0]), f_org_y, f_org_x + (i*imsize[0]) + imsize[0], f_org_y+imsize[1] ) self.im.paste( st_font[i], box ) def drawLineGraph( self , x_list, y_list ) : draw = ImageDraw.Draw(self.im) basebox = self.getGraphBaseSquare( 0 ) org_xy = ( basebox[3][0], basebox[3][1] ) max_xy = ( basebox[1][0], basebox[1][1] ) # x = distance / y = speed # x = distance / y = ele dist_all = 0 x_par = self.x_unit['px_par_val'] y_par = self.y_unit['px_par_val'] unit_offset = 1 if self.mode == "speed" : unit_offset = 1000 if self.debug : print "basebox : %s %s" % (org_xy, max_xy) if self.debug : print "1px/x(m)=%f 1px/y(m)=%f" % (x_par,y_par) for i in range( 1, len(x_list) ) : # print "len(x_list)=%d len(y_list)=%d x_list[%d]=%d y_list[%d]=%d" % (len(x_list),len(y_list),i,x_list[i],i,y_list[i]) f_dist_1 = float(x_list[i-1]) f_speed_1 = float(y_list[i-1]) f_dist_2 = float(x_list[i]) f_speed_2 = float(y_list[i]) x_px1 = eval( "%d" % (x_par * dist_all) ) y_px1 = eval( "%d" % (y_par * f_speed_1 * unit_offset) ) x_px2 = eval( "%d" % (x_par * (dist_all+f_dist_2) ) ) y_px2 = eval( "%d" % (y_par * f_speed_2 * unit_offset) ) xy = [( org_xy[0] + x_px1, org_xy[1] - y_px1), (org_xy[0] + x_px2, org_xy[1] - y_px2)] #if self.debug : print "dist1=%d,dist2=%d" % (dist_all, dist_all+f_dist_2) #if self.debug : print "x=(%.3f->%.3f y=%.3f->%.3f (%d,%d -> %d,%d)" % (f_dist_1,f_dist_2,f_speed_1,f_speed_2,x_px1,y_px1,x_px2,y_px2) draw.line( xy, fill=self.gcol) dist_all += f_dist_2 def end( self ): self.im.save( self.fname ,"PNG") def process_file(file): gpx = GPXFileData( debug=1 ) try: doc = FromXmlStream(file) except: print "[WARNING!] gpx file was broken [WARNING!]" return None gpx.makeLogData( doc ) d = gpx.getLogData( ) rundata = gpx.getRunningData( ) rundata['dist'].insert(0, 0) if rundata.has_key( 'time' ) : rundata['speed'].insert(0, 0) gs = SpeedGraph( fname=d['name']+"_speed.png",mode="speed", debug=1) gs.makeGraphBase( rundata['dist_data']['total'], rundata['speed_data']['max'] ) gs.drawLineGraph( rundata['dist'], rundata['speed'] ) gs.end() if rundata.has_key( 'ele' ) : rundata['ele'].insert(0, 0) ge = SpeedGraph( fname=d['name']+"_ele.png",mode="ele", debug=1) ge.makeGraphBase( rundata['dist_data']['total'], rundata['ele_data']['max'] ) ge.drawLineGraph( rundata['dist'], rundata['ele'] ) ge.end() return gpx.getGoogleJSList() # library test/debug function (dump given files) if __name__ == '__main__': import sys import gc import time gc.enable() start_time = time.clock() start = len(gc.get_objects()) print "%s object at the beginning" % start if len(sys.argv) < 2: print 'Usage: %s files...\n' % sys.argv[0] sys.exit(0) for filename in sys.argv[1:]: try: file=open(filename, 'r') except: print filename, 'unreadable' print continue print filename+':' data=process_file(file) if data: print len(data) # print data else : print 'No GPX information found' file.close() end_time = time.clock() duration = end_time - start_time end = len(gc.get_objects()) print "" print "---------------------------------------------" print "%s object at the end" % end print "so %s object have been created for one photo" % (end-start) print "---------------------------------------------" print "Time needed : %s" % duration