GPX解析エンジン test varsion
「GPXログをATGoogleMapsで(GPX解析エンジン作成編) 」 で作成したpython script。
Size 20.7 kB - File type text/python-sourceFile contents
from xml.dom.ext.reader.Sax import FromXmlStream
from datetime import tzinfo, timedelta, datetime
import time
import math
import Image
import ImageDraw
class UTC(tzinfo):
"""UTC"""
def utcoffset(self, dt):
return timedelta(0)
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return timedelta(0)
class JST(tzinfo):
"""JST"""
def utcoffset(self, dt):
return timedelta(hours=9)
def tzname(self, dt):
return "JST"
def dst(self, dt):
return timedelta(hours=9)
##########################################################################
#
# *.gpx file Class
# ・<trkseg>は無視でfile内にある<trkpt>をいっしょくたに扱ってたりする
# ・<trk>が複数出てもいっしょくた(多分)
# ・<wpt>は・・・どうしよーか
#
##########################################################################
class GPXFileData :
def __init__(self, debug=0):
self.gpx_data = {}
self.debug = debug
self.item_list = {'trk':['lon', 'lat', 'ele', 'time'],'wpt':['lon', 'lat', 'ele', 'time', 'cmt']}
def makeItemListStr( self, list ) :
s = "{"
for i in list :
s += "'%s':''," % i
s += "}"
return s
def getJSTTime( self, s ) :
# GPXにある時刻はUTCと決めつけて処理
# UTC -> JST 変換はこの辺を参照してtestしまくった...orz
# http://www.python.jp/doc/2.4/lib/datetime-datetime.html
# http://www.python.jp/doc/2.4/lib/datetime-tzinfo.html
utc = UTC()
jst = JST()
# YYYY-mm-ddTHH:MM:SSZ ?
if len("YYYY-mm-ddTHH:MM:SSZ") != len( s ) :
if self.debug : print "time data not found"
return None
t = time.strptime( s, "%Y-%m-%dT%H:%M:%SZ")
ut = datetime( year=t[0],month=t[1],day=t[2], hour=t[3], minute=t[4], second=t[5], tzinfo=utc )
jt = ut.astimezone(jst)
if self.debug : print "UTC=%s JST=%s" % (ut,jt)
return jt
def initTrkItems( self, name ) :
s = self.makeItemListStr( self.item_list[name] )
self.gpx_data[name].append(eval(s))
def setTrkItemList( self, name, value ):
item = value
if name == 'time' : item = self.getJSTTime( value )
now_trkpt = len(self.gpx_data['trk']) - 1
self.gpx_data['trk'][now_trkpt][name] = item
if self.debug: print "%s(%d)=%s" % (name,len(self.gpx_data['trk']), item)
def setTrkItemValue( self, name, value ):
self.gpx_data[name] = value
if self.debug: print "%s=%s\n" % (name, value)
def getNodeAttr( self, node ):
name = node.nodeName
attrs = node.attributes
if attrs != None and len(attrs) > 0:
for a in attrs:
if name == 'trkpt' and (a.name == 'lat' or a.name == 'lon'):
self.setTrkItemList( a.name, a.value )
if name == 'gpx' and a.name == 'creator':
self.setTrkItemValue( a.name, a.value )
def nodeInspect( self, node, pname='' ):
name = node.nodeName
value = node.nodeValue
if name == 'gpx':
self.getNodeAttr( node )
elif name == 'trk' :
self.gpx_data['trk'] = []
elif name == 'trkpt' :
self.initTrkItems( 'trk' )
if pname == "name" :
self.setTrkItemValue( pname, value )
elif pname == "trkseg":
self.getNodeAttr( node )
elif pname in ('ele','time') :
self.setTrkItemList( pname, value )
children = node.childNodes
for c in children:
self.nodeInspect( c, name )
def checkTrkData( self, trk_data ) :
# time check (all none ?)
timef = 0
for j in trk_data :
if j['time'] != '' :
timef = 1
break
if timef == 0 :
for j in trk_data :
del( j['time'] )
elef = 0
for j in trk_data :
if j['ele'] != '' :
elef = 1
break
if elef == 0 :
for j in trk_data :
del( j['ele'] )
dellist = []
for i in range( len(trk_data) ) :
if eval(trk_data[i]['lon']) == 0 :
dellist.append(i)
elif eval(trk_data[i]['lat']) == 0 :
dellist.append(i)
elif trk_data[i].has_key('time') :
if trk_data[i]['time'] == '' or None :
dellist.append(i)
elif trk_data[i].has_key('ele') :
if trk_data[i]['ele'] == '' :
dellist.append(i)
dellist.reverse()
for i in dellist :
del( trk_data[i] )
if self.debug and len(dellist) : print "checkTrkData() : illegal data found"
def makeLogData( self, stream ):
self.nodeInspect( stream )
# data matching & check
if self.gpx_data.has_key('trk') :
self.checkTrkData( self.gpx_data['trk'] )
def getLogData( self ) :
return self.gpx_data
def getGoogleJSList( self ) :
gpx_data = self.gpx_data
rtn = ""
if gpx_data.has_key('trk') :
data = gpx_data['trk']
if data[0].has_key('lat') and data[0].has_key('lon') :
rtn = "//point count=%d\n" % len(data)
for i in data :
rtn += "points.push(new GLatLng(%s,%s));\n" % (i['lat'],i['lon'])
rtn += "//start point\nwpt_points.push(new GLatLng(%s,%s));\n" % (data[0]['lat'],data[0]['lon'])
rtn += "//end point\nwpt_points.push(new GLatLng(%s,%s));\n" % (data[len(data)-1]['lat'],data[len(data)-1]['lon'])
return rtn
def chgDegreeToRadian( self, data ) :
return (data / 360 * 2 * 3.14159265)
def getDistance( self, data1, data2 ) :
# http://wadati.blog10.fc2.com/blog-entry-345.html
a = 6378137
lat_1 = self.chgDegreeToRadian( eval(data1['lat']) )
lat_2 = self.chgDegreeToRadian( eval(data2['lat']) )
lon_1 = self.chgDegreeToRadian( eval(data1['lon']) )
lon_2 = self.chgDegreeToRadian( eval(data2['lon']) )
lat = lat_2 - lat_1
lon = lon_2 - lon_1
lat_x = a * lat * math.cos(lon_1)
lon_y = a * lon
return math.sqrt( lat_x**2 + lon_y**2 )
def getMaxMinAvgTotal( self, list ) :
total = timedelta(0)
rtn = {}
l_list = []
is_time = 0
print type(list[0])
if type( list[0] ) == type(total) :
is_time = 1
else :
total = 0.0
for i in list :
l_list.append( i )
total += i
if is_time == 1 :
rtn['min'] = self.gpx_data['trk'][0]['time']
rtn['max'] = self.gpx_data['trk'][len(self.gpx_data['trk'])-1]['time']
else :
l_list.sort()
l_list.reverse()
rtn['max'] = l_list[0]
rtn['min'] = l_list[len(l_list)-1]
rtn['avg'] = total / len(l_list)
rtn['total'] = total
return rtn
def getRunningData( self ) :
d = self.gpx_data['trk']
rtn = {}
rtn['dist'] = []
if d[0].has_key('time') :
rtn['time'] = []
rtn['speed'] = []
if d[0].has_key('ele') :
rtn['ele'] = []
for i in range( 1, len(d) ) :
dist = self.getDistance( d[i-1],d[i] )
rtn['dist'].append(dist)
if rtn.has_key( 'time' ) :
time_dist = d[i]['time'] - d[i-1]['time']
s = dist / time_dist.seconds
s = s * 60 * 60 /1000
rtn['speed'].append(s)
rtn['time'].append(time_dist)
if rtn.has_key( 'ele' ) :
ele = eval(d[i]['ele'])
if ele < 0 : ele = 0
rtn['ele'].append(ele)
rtn['dist_data'] = self.getMaxMinAvgTotal( rtn['dist'] )
if self.debug : print "total distance : %.3fkm" % (rtn['dist_data']['total']/1000)
if rtn.has_key( 'time' ) :
rtn['time_data'] = self.getMaxMinAvgTotal( rtn['time'] )
if self.debug : print "time : start=%s end=%s total=%s" % (rtn['time_data']['min'],rtn['time_data']['max'],rtn['time_data']['total'])
rtn['speed_data'] = self.getMaxMinAvgTotal( rtn['speed'] )
if self.debug : print "speed(max=%.03fkm/h min=%.03fkm/h avg=%.03fkm/h)" % (rtn['speed_data']['max'], rtn['speed_data']['min'],rtn['speed_data']['avg'])
if rtn.has_key( 'ele' ) :
rtn['ele_data'] = self.getMaxMinAvgTotal( rtn['ele'] )
if self.debug : print "elevation(max=%dm min=%dm avg=%.03fm)" % (rtn['ele_data']['max'], rtn['ele_data']['min'],rtn['ele_data']['avg'])
return rtn
class SpeedGraph :
def __init__(self, fname, mode, size=None, bgcol=None, basecol=None, gcol=None, debug=0):
self.offset = 30
self.size = (600,400)
self.bgcol = (255,255,255)
self.basecol = (237,234,229)
self.gcol = (221,0,0)
self.lcol = (0,0,0)
self.lcol_sub = (224,224,224)
self.im = None
self.debug = debug
self.x_unit = {}
self.y_unit = {}
self.fname = fname
self.mode = mode
if size : self.size = size
if bgcol : self.bgcol = bgcol
if basecol : self.basecol = basecol
if gcol : self.gcol = gcol
size = (self.size[0]+self.offset*2,self.size[1]+self.offset*2)
self.im = Image.new( "RGB", size, self.basecol )
def getFontImages( self ) :
imglist = {}
t = []
tim = Image.open("font_image.png")
size = ( tim.size[0], (tim.size[1] / 16) )
imglist['size'] = size
imglist['num'] = []
if self.debug : print "getFontImages : crop unit(x=%d,y=%d)" % (size[0],size[1])
for i in range(0,16) :
step = i * size[1]
region = tim.crop( ( 0,step,size[0],step+size[1]) )
if i < 10 :
imglist['num'].append( region )
else :
t.append( region )
imglist['km_h'] = [t[4],t[0],t[1],t[2],t[3],t[5]]
imglist['km'] = [t[4],t[0],t[1],t[5]]
imglist['m'] = [t[4],t[1],t[5]]
return imglist
def getGraphBaseSquare( self, offset=0 ) :
sq = []
sq.append( (self.offset-offset, self.offset-offset ) )
sq.append( (self.size[0]+self.offset+offset, self.offset-offset ) )
sq.append( (self.size[0]+self.offset+offset, self.size[1]+self.offset+offset) )
sq.append( (self.offset-offset, self.size[1]+self.offset+offset) )
if self.debug : print "getGraphBaseSquare(%d) : %s" % ( offset, sq )
return sq
def disctionLineUnit( self, width, minls, maxv, disits ) :
# width = グラフエリアの最大幅(px)
# minls = ラインの最小間隔(px)
# maxv = 距離とかの最大値(メートル)
# disits = (list)最大桁リスト(10,100,1000,10000..) これも単位はメートル
line_dist = []
rtn_unit = {}
f_width = float(width)
f_maxv = float(maxv)
# graph max width(height) : ? px = max value : 1
px_par_val = f_width / f_maxv
lp = f_width / minls
# if width % minls : lp += 1
for i in disits :
for j in (1,2,5) :
line_dist.append( {'lval': i*j, 'ldigit': i, 'lstep': j, 'maxval': i*j*lp} )
for i in line_dist :
if i['maxval'] >= f_maxv :
rtn_unit = i
break
# 1value当たりのpx
if self.debug : print rtn_unit
rtn_unit['px_par_val']= px_par_val
# m or km ?
if rtn_unit['ldigit'] >= 1000 :
rtn_unit['lunit'] = 'km'
rtn_unit['ldigit'] = rtn_unit['ldigit'] / 1000
else :
rtn_unit['lunit'] = 'm'
del(rtn_unit['maxval'])
del(rtn_unit['lval'])
if self.debug : print rtn_unit
return rtn_unit
def makeGraphBase(self, dist, speed ) :
draw = ImageDraw.Draw(self.im)
nfont = self.getFontImages( )
imsize = nfont['size']
# graph base square
basebox = self.getGraphBaseSquare( 1 )
draw.polygon( basebox, fill=self.lcol)
basebox = self.getGraphBaseSquare( 0 )
draw.polygon( basebox, fill=self.bgcol)
org_xy = ( basebox[0][0], basebox[0][1] )
max_xy = ( basebox[2][0], basebox[2][1] )
#######################################################################
# dist(m) line(max ...10000kmは対応してると思う)
# 最小ライン幅決定(=fontの縦サイズ×千桁(4))
# default = 6px * 4 = 24px
min_ls_x = imsize[0] * 4 + 8
min_ls_x += 8
self.x_unit = self.disctionLineUnit( self.size[0], min_ls_x, dist, (10,100,1000,10000,100000) )
str_int = 0
px = 0
unit_offset = 1000
if self.x_unit['lunit'] == 'm' : unit_offset = 1
while px < self.size[0] :
x = eval("%d" % (org_xy[0] + (px+0.5)) )
draw.line( [(x,org_xy[1]),(x,max_xy[1])], fill=self.lcol_sub )
fontstr = "%s" % str_int
for i in range(0,len(fontstr)) :
if self.debug : print "%s %d" % (fontstr,eval(fontstr[i]))
box = (x+(i*imsize[0]), max_xy[1]+3, x+(i*imsize[0])+imsize[0], max_xy[1]+3+imsize[1])
self.im.paste( nfont['num'][eval(fontstr[i])], box )
str_int = str_int + (self.x_unit['ldigit'] * self.x_unit['lstep'])
px = str_int * self.x_unit['px_par_val'] * unit_offset
# put unit string [m] or [km]
f_org_x = max_xy[0] + 3
f_org_y = max_xy[1] + 3 + imsize[1]
for i in range( 0,len( nfont[self.x_unit['lunit']] ) ) :
box = (f_org_x + (i*imsize[0]), f_org_y, f_org_x + (i*imsize[0]) + imsize[0], f_org_y + imsize[1])
self.im.paste( nfont[self.x_unit['lunit']][i], box )
#######################################################################
# km/h or 高度(m) line(max ...10000km/hは対応してると思う)
# 最小ライン幅決定(=fontの縦サイズ)
# default = 8px
min_ls_y = imsize[1]
min_ls_y += 13
if self.mode == "speed" :
if self.debug : print "mode speed graph"
# 速度は%.fkm/hなので小数点以下丸め込みの上、メートルに変換して渡す
self.y_unit = self.disctionLineUnit( self.size[1], min_ls_y, eval("%d" % speed) * 1000, (10000,100000) )
else :
if self.debug : print "mode elevation graph"
# 高度はmなのでそのまま
self.y_unit = self.disctionLineUnit( self.size[1], min_ls_y, speed, (10,100,1000,10000,100000) )
str_int = 0
px = 0
unit_offset = 1000
if self.y_unit['lunit'] == 'm' : unit_offset = 1
while px < self.size[1] :
y = eval("%d" % (max_xy[1] - px) )
draw.line( [(org_xy[0],y),(max_xy[0],y)],fill=self.lcol)
fontstr = "%s" % str_int
if self.debug : print len(fontstr)
for i in range(0,len(fontstr)) :
offset = len(fontstr) - i - 1
if self.debug : print "%s %d" % (fontstr,eval(fontstr[i]))
f_org_x = org_xy[0] - 3 - offset*imsize[0] - imsize[0]
f_org_y = y - imsize[1]
box = ( f_org_x, f_org_y, f_org_x + imsize[0], f_org_y + imsize[1] )
self.im.paste( nfont['num'][eval(fontstr[i])], box )
str_int = str_int + (self.y_unit['ldigit'] * self.y_unit['lstep'])
px = str_int * self.y_unit['px_par_val'] * unit_offset
# put unit string [km/h]
f_org_x = org_xy[0] - imsize[0]*3 - 3
f_org_y = org_xy[1] - 3 - (imsize[1]*2)
if self.mode == "speed" :
st_font = nfont['km_h']
else :
st_font = nfont[self.y_unit['lunit']]
for i in range( 0,len( st_font ) ) :
box = (f_org_x + (i*imsize[0]), f_org_y, f_org_x + (i*imsize[0]) + imsize[0], f_org_y+imsize[1] )
self.im.paste( st_font[i], box )
def drawLineGraph( self , x_list, y_list ) :
draw = ImageDraw.Draw(self.im)
basebox = self.getGraphBaseSquare( 0 )
org_xy = ( basebox[3][0], basebox[3][1] )
max_xy = ( basebox[1][0], basebox[1][1] )
# x = distance / y = speed
# x = distance / y = ele
dist_all = 0
x_par = self.x_unit['px_par_val']
y_par = self.y_unit['px_par_val']
unit_offset = 1
if self.mode == "speed" : unit_offset = 1000
if self.debug : print "basebox : %s %s" % (org_xy, max_xy)
if self.debug : print "1px/x(m)=%f 1px/y(m)=%f" % (x_par,y_par)
for i in range( 1, len(x_list) ) :
# print "len(x_list)=%d len(y_list)=%d x_list[%d]=%d y_list[%d]=%d" % (len(x_list),len(y_list),i,x_list[i],i,y_list[i])
f_dist_1 = float(x_list[i-1])
f_speed_1 = float(y_list[i-1])
f_dist_2 = float(x_list[i])
f_speed_2 = float(y_list[i])
x_px1 = eval( "%d" % (x_par * dist_all) )
y_px1 = eval( "%d" % (y_par * f_speed_1 * unit_offset) )
x_px2 = eval( "%d" % (x_par * (dist_all+f_dist_2) ) )
y_px2 = eval( "%d" % (y_par * f_speed_2 * unit_offset) )
xy = [( org_xy[0] + x_px1, org_xy[1] - y_px1), (org_xy[0] + x_px2, org_xy[1] - y_px2)]
#if self.debug : print "dist1=%d,dist2=%d" % (dist_all, dist_all+f_dist_2)
#if self.debug : print "x=(%.3f->%.3f y=%.3f->%.3f (%d,%d -> %d,%d)" % (f_dist_1,f_dist_2,f_speed_1,f_speed_2,x_px1,y_px1,x_px2,y_px2)
draw.line( xy, fill=self.gcol)
dist_all += f_dist_2
def end( self ):
self.im.save( self.fname ,"PNG")
def process_file(file):
gpx = GPXFileData( debug=1 )
try:
doc = FromXmlStream(file)
except:
print "[WARNING!] gpx file was broken [WARNING!]"
return None
gpx.makeLogData( doc )
d = gpx.getLogData( )
rundata = gpx.getRunningData( )
rundata['dist'].insert(0, 0)
if rundata.has_key( 'time' ) :
rundata['speed'].insert(0, 0)
gs = SpeedGraph( fname=d['name']+"_speed.png",mode="speed", debug=1)
gs.makeGraphBase( rundata['dist_data']['total'], rundata['speed_data']['max'] )
gs.drawLineGraph( rundata['dist'], rundata['speed'] )
gs.end()
if rundata.has_key( 'ele' ) :
rundata['ele'].insert(0, 0)
ge = SpeedGraph( fname=d['name']+"_ele.png",mode="ele", debug=1)
ge.makeGraphBase( rundata['dist_data']['total'], rundata['ele_data']['max'] )
ge.drawLineGraph( rundata['dist'], rundata['ele'] )
ge.end()
return gpx.getGoogleJSList()
# library test/debug function (dump given files)
if __name__ == '__main__':
import sys
import gc
import time
gc.enable()
start_time = time.clock()
start = len(gc.get_objects())
print "%s object at the beginning" % start
if len(sys.argv) < 2:
print 'Usage: %s files...\n' % sys.argv[0]
sys.exit(0)
for filename in sys.argv[1:]:
try:
file=open(filename, 'r')
except:
print filename, 'unreadable'
print
continue
print filename+':'
data=process_file(file)
if data:
print len(data)
# print data
else :
print 'No GPX information found'
file.close()
end_time = time.clock()
duration = end_time - start_time
end = len(gc.get_objects())
print ""
print "---------------------------------------------"
print "%s object at the end" % end
print "so %s object have been created for one photo" % (end-start)
print "---------------------------------------------"
print "Time needed : %s" % duration
Click here to get the file