Personal tools
You are here: Home Zope&Plone Tips ファイル置き場 GPX解析エンジン test varsion
Document Actions

GPX解析エンジン test varsion

by maru last modified 2007-05-17 11:46

「GPXログをATGoogleMapsで(GPX解析エンジン作成編) 」 で作成したpython script。

Click here to get the file

Size 20.7 kB - File type text/python-source

File contents

from xml.dom.ext.reader.Sax import FromXmlStream
from datetime import tzinfo, timedelta, datetime
import time
import math
import Image
import ImageDraw

class UTC(tzinfo):
    """UTC"""
    def utcoffset(self, dt):
        return timedelta(0)

    def tzname(self, dt):
        return "UTC"

    def dst(self, dt):
        return timedelta(0)

class JST(tzinfo):
    """JST"""
    def utcoffset(self, dt):
        return timedelta(hours=9)

    def tzname(self, dt):
        return "JST"

    def dst(self, dt):
        return timedelta(hours=9)

##########################################################################
#
#  *.gpx file Class 
#    ・<trkseg>は無視でfile内にある<trkpt>をいっしょくたに扱ってたりする
#    ・<trk>が複数出てもいっしょくた(多分)
#    ・<wpt>は・・・どうしよーか
#
##########################################################################

class GPXFileData :

    def __init__(self, debug=0):
        self.gpx_data = {}
        self.debug = debug
        self.item_list = {'trk':['lon', 'lat', 'ele', 'time'],'wpt':['lon', 'lat', 'ele', 'time', 'cmt']}


    def makeItemListStr( self, list ) :
        s = "{"
        for i in list :
            s += "'%s':''," % i
        s += "}"
        return s


    def getJSTTime( self, s ) :
        # GPXにある時刻はUTCと決めつけて処理
        # UTC -> JST 変換はこの辺を参照してtestしまくった...orz
        # http://www.python.jp/doc/2.4/lib/datetime-datetime.html
        # http://www.python.jp/doc/2.4/lib/datetime-tzinfo.html
        utc = UTC()
        jst = JST()
        # YYYY-mm-ddTHH:MM:SSZ ?
        if len("YYYY-mm-ddTHH:MM:SSZ") != len( s ) :
            if self.debug : print "time data not found"
            return None
        
        t = time.strptime( s, "%Y-%m-%dT%H:%M:%SZ")

        ut = datetime( year=t[0],month=t[1],day=t[2], hour=t[3], minute=t[4], second=t[5], tzinfo=utc )
        jt = ut.astimezone(jst)
        if self.debug : print "UTC=%s JST=%s" %  (ut,jt)

        return jt


    def initTrkItems( self, name ) :
        
        s = self.makeItemListStr( self.item_list[name] )
        self.gpx_data[name].append(eval(s))


    def setTrkItemList( self, name, value ):
        
        item = value
        if name == 'time' : item = self.getJSTTime( value )
        now_trkpt = len(self.gpx_data['trk']) - 1
        self.gpx_data['trk'][now_trkpt][name] = item
        if self.debug: print "%s(%d)=%s" % (name,len(self.gpx_data['trk']), item)


    def setTrkItemValue( self, name, value ):
        self.gpx_data[name] = value
        if self.debug: print "%s=%s\n" % (name, value)


    def getNodeAttr( self, node ):
        name  = node.nodeName
        attrs = node.attributes
        if attrs != None and len(attrs) > 0:
            for a in attrs:
                if name == 'trkpt' and (a.name == 'lat' or a.name == 'lon'):
                    self.setTrkItemList( a.name, a.value )
                if name == 'gpx' and a.name == 'creator':
                    self.setTrkItemValue( a.name, a.value )


    def nodeInspect( self, node, pname='' ):
        name = node.nodeName
        value = node.nodeValue

        if name == 'gpx':
            self.getNodeAttr( node )
        elif name == 'trk' :
            self.gpx_data['trk'] = []
        elif name == 'trkpt' :
            self.initTrkItems( 'trk' )

        if pname == "name" :
            self.setTrkItemValue( pname, value )
        elif pname == "trkseg":
            self.getNodeAttr( node )
        elif pname in ('ele','time') :
            self.setTrkItemList( pname, value )

        children = node.childNodes
        for c in children:
          self.nodeInspect( c, name )


    def checkTrkData( self, trk_data ) :
    
        # time check (all none ?)
        timef = 0
        for j in trk_data :
            if j['time'] != '' :
                timef = 1
                break
        
        if timef == 0 :
            for j in trk_data :
                del( j['time'] )

        elef  = 0
        for j in trk_data :
            if j['ele'] != '' :
                elef = 1
                break
        if elef == 0 :
            for j in trk_data :
                del( j['ele'] )

        dellist = []
        for i in range( len(trk_data) ) :
            if   eval(trk_data[i]['lon']) == 0 :
                dellist.append(i)
            elif eval(trk_data[i]['lat']) == 0 :
                dellist.append(i)
            elif trk_data[i].has_key('time') :
                if trk_data[i]['time'] == ''  or None :
                    dellist.append(i)
            elif trk_data[i].has_key('ele') :
                if trk_data[i]['ele']  == '' :
                    dellist.append(i)
        
        dellist.reverse()
        for i in dellist :
            del( trk_data[i] )

        if self.debug and len(dellist) : print "checkTrkData() : illegal data found"


    def makeLogData( self, stream ):
        
        self.nodeInspect( stream )
        
        # data matching & check
        if self.gpx_data.has_key('trk') :
            self.checkTrkData( self.gpx_data['trk'] )


    def getLogData( self ) :
        return self.gpx_data


    def getGoogleJSList( self ) :
        gpx_data = self.gpx_data
        rtn = ""

        if gpx_data.has_key('trk') :
            data = gpx_data['trk']
            if data[0].has_key('lat') and data[0].has_key('lon') :
                rtn = "//point count=%d\n" % len(data)
                for i in data :
                    rtn += "points.push(new GLatLng(%s,%s));\n" % (i['lat'],i['lon'])
            rtn += "//start point\nwpt_points.push(new GLatLng(%s,%s));\n" % (data[0]['lat'],data[0]['lon'])
            rtn += "//end   point\nwpt_points.push(new GLatLng(%s,%s));\n" % (data[len(data)-1]['lat'],data[len(data)-1]['lon'])

        return rtn

    def chgDegreeToRadian( self, data ) :
        return (data / 360 * 2 * 3.14159265)


    def getDistance( self, data1, data2 ) :
        # http://wadati.blog10.fc2.com/blog-entry-345.html
        a = 6378137

        lat_1 = self.chgDegreeToRadian( eval(data1['lat']) )
        lat_2 = self.chgDegreeToRadian( eval(data2['lat']) )
        lon_1 = self.chgDegreeToRadian( eval(data1['lon']) )
        lon_2 = self.chgDegreeToRadian( eval(data2['lon']) )

        lat = lat_2 - lat_1
        lon = lon_2 - lon_1
        lat_x = a * lat * math.cos(lon_1)
        lon_y = a * lon
        return math.sqrt( lat_x**2 + lon_y**2 )


    def getMaxMinAvgTotal( self, list ) :
        total = timedelta(0)
        rtn = {}
        l_list = []
        is_time = 0

        print type(list[0])
        
        if type( list[0] ) == type(total) :
            is_time = 1
        else :
            total = 0.0

        for i in list :
            l_list.append( i )
            total += i
        
        if is_time == 1 :
            rtn['min'] = self.gpx_data['trk'][0]['time']
            rtn['max'] = self.gpx_data['trk'][len(self.gpx_data['trk'])-1]['time']
        else :
            l_list.sort()
            l_list.reverse()
            rtn['max']   = l_list[0]
            rtn['min']   = l_list[len(l_list)-1]
            rtn['avg']   = total / len(l_list)
            
        rtn['total'] = total

        return rtn


    def getRunningData( self ) :
        
        d = self.gpx_data['trk']
        rtn = {}
        
        rtn['dist'] = []
        if d[0].has_key('time') :
            rtn['time']  = []
            rtn['speed'] = []
        if d[0].has_key('ele') :
            rtn['ele']   = []
        
        for i in range( 1, len(d) ) :
            dist = self.getDistance( d[i-1],d[i] )
            rtn['dist'].append(dist)

            if rtn.has_key( 'time' ) :
                time_dist = d[i]['time'] - d[i-1]['time']
                s = dist / time_dist.seconds
                s = s * 60 * 60 /1000
                rtn['speed'].append(s)
                rtn['time'].append(time_dist)
            
            if rtn.has_key( 'ele' ) :
                ele = eval(d[i]['ele'])
                if ele < 0 : ele = 0
                rtn['ele'].append(ele)
            
        rtn['dist_data']  = self.getMaxMinAvgTotal( rtn['dist'] )
        if self.debug : print "total distance : %.3fkm" % (rtn['dist_data']['total']/1000)
        if rtn.has_key( 'time' ) :
            rtn['time_data'] = self.getMaxMinAvgTotal( rtn['time'] )
            if self.debug : print "time : start=%s end=%s total=%s" % (rtn['time_data']['min'],rtn['time_data']['max'],rtn['time_data']['total'])
            rtn['speed_data'] = self.getMaxMinAvgTotal( rtn['speed'] )
            if self.debug : print "speed(max=%.03fkm/h min=%.03fkm/h avg=%.03fkm/h)" % (rtn['speed_data']['max'], rtn['speed_data']['min'],rtn['speed_data']['avg'])
        if rtn.has_key( 'ele' ) :
            rtn['ele_data']  = self.getMaxMinAvgTotal( rtn['ele'] )
            if self.debug : print "elevation(max=%dm min=%dm avg=%.03fm)" % (rtn['ele_data']['max'], rtn['ele_data']['min'],rtn['ele_data']['avg'])
        
        return rtn


class SpeedGraph :

    def __init__(self, fname, mode, size=None, bgcol=None, basecol=None, gcol=None, debug=0):

        self.offset   = 30
        self.size     = (600,400)
        self.bgcol    = (255,255,255)
        self.basecol  = (237,234,229)
        self.gcol     = (221,0,0)
        self.lcol     = (0,0,0)
        self.lcol_sub = (224,224,224)
        self.im       = None
        self.debug    = debug
        self.x_unit   = {}
        self.y_unit   = {}
        self.fname    = fname
        self.mode     = mode

        if size : self.size = size
        if bgcol : self.bgcol = bgcol
        if basecol : self.basecol = basecol
        if gcol : self.gcol = gcol
 
        size = (self.size[0]+self.offset*2,self.size[1]+self.offset*2)
        self.im = Image.new( "RGB", size, self.basecol )


    def getFontImages( self ) :
        imglist = {}
        t = []
        
        tim = Image.open("font_image.png")
        size = ( tim.size[0], (tim.size[1] / 16) )
        imglist['size'] = size
        imglist['num'] = []

        if self.debug : print "getFontImages : crop unit(x=%d,y=%d)" % (size[0],size[1])

        for i in range(0,16) :
            step = i * size[1]
            region = tim.crop( ( 0,step,size[0],step+size[1]) )
            if i < 10 :
                imglist['num'].append( region )
            else :
                t.append( region )
        
        imglist['km_h'] = [t[4],t[0],t[1],t[2],t[3],t[5]]
        imglist['km'] = [t[4],t[0],t[1],t[5]]
        imglist['m'] = [t[4],t[1],t[5]]
        
        return imglist


    def getGraphBaseSquare( self, offset=0 ) :

        sq = []
        sq.append( (self.offset-offset,              self.offset-offset ) )
        sq.append( (self.size[0]+self.offset+offset, self.offset-offset ) )
        sq.append( (self.size[0]+self.offset+offset, self.size[1]+self.offset+offset) )
        sq.append( (self.offset-offset,              self.size[1]+self.offset+offset) )
        if self.debug : print "getGraphBaseSquare(%d) : %s" % ( offset, sq )
        return sq


    def disctionLineUnit( self, width, minls, maxv, disits ) :

        # width   = グラフエリアの最大幅(px)
        # minls   = ラインの最小間隔(px)
        # maxv    = 距離とかの最大値(メートル)
        # disits  = (list)最大桁リスト(10,100,1000,10000..) これも単位はメートル
        
        line_dist = []
        rtn_unit = {}

        f_width = float(width)
        f_maxv  = float(maxv)
        
        # graph max width(height) : ? px = max value : 1
        px_par_val = f_width / f_maxv
        
        lp = f_width / minls
#        if width % minls : lp += 1
        for i in disits :
            for j in (1,2,5) :
                line_dist.append( {'lval': i*j, 'ldigit': i, 'lstep': j, 'maxval': i*j*lp} )
        
        for i in line_dist :
            if i['maxval'] >= f_maxv :
                rtn_unit = i
                break
        # 1value当たりのpx
        if self.debug : print rtn_unit
        rtn_unit['px_par_val']= px_par_val

        # m or km ?
        if rtn_unit['ldigit'] >= 1000 :
            rtn_unit['lunit']  = 'km'
            rtn_unit['ldigit'] = rtn_unit['ldigit'] / 1000
        else :
            rtn_unit['lunit'] = 'm'

        del(rtn_unit['maxval'])
        del(rtn_unit['lval'])
        
        if self.debug : print rtn_unit
        return rtn_unit


    def makeGraphBase(self, dist, speed ) :

        draw  = ImageDraw.Draw(self.im)
        nfont = self.getFontImages( )
        imsize = nfont['size']

        # graph base square
        basebox = self.getGraphBaseSquare( 1 )
        draw.polygon( basebox, fill=self.lcol)
        basebox = self.getGraphBaseSquare( 0 )
        draw.polygon( basebox, fill=self.bgcol)

        org_xy = ( basebox[0][0], basebox[0][1] )
        max_xy = ( basebox[2][0], basebox[2][1] )

        #######################################################################
        # dist(m) line(max ...10000kmは対応してると思う)

        # 最小ライン幅決定(=fontの縦サイズ×千桁(4))
        # default = 6px * 4 = 24px
        min_ls_x = imsize[0] * 4 + 8
        min_ls_x += 8

        self.x_unit = self.disctionLineUnit( self.size[0], min_ls_x, dist, (10,100,1000,10000,100000) )

        str_int = 0
        px = 0
        unit_offset = 1000
        if self.x_unit['lunit'] == 'm' : unit_offset = 1
        while px < self.size[0] :
            x = eval("%d" % (org_xy[0] + (px+0.5)) )
            draw.line( [(x,org_xy[1]),(x,max_xy[1])], fill=self.lcol_sub )
            fontstr = "%s" % str_int
            for i in range(0,len(fontstr)) :
                if self.debug : print "%s %d" % (fontstr,eval(fontstr[i]))
                box = (x+(i*imsize[0]), max_xy[1]+3, x+(i*imsize[0])+imsize[0], max_xy[1]+3+imsize[1])
                self.im.paste( nfont['num'][eval(fontstr[i])], box )
            str_int = str_int + (self.x_unit['ldigit'] * self.x_unit['lstep'])
            px      = str_int * self.x_unit['px_par_val'] * unit_offset

        # put unit string [m] or [km]
        f_org_x = max_xy[0] + 3
        f_org_y = max_xy[1] + 3 + imsize[1]
        for i in range( 0,len( nfont[self.x_unit['lunit']] ) ) :
            box = (f_org_x + (i*imsize[0]), f_org_y, f_org_x + (i*imsize[0]) + imsize[0], f_org_y + imsize[1])
            self.im.paste( nfont[self.x_unit['lunit']][i], box )

        #######################################################################
        # km/h or 高度(m) line(max ...10000km/hは対応してると思う)

        # 最小ライン幅決定(=fontの縦サイズ)
        # default = 8px
        min_ls_y = imsize[1]
        min_ls_y += 13

        if self.mode == "speed" :
            if self.debug : print "mode speed graph"
            # 速度は%.fkm/hなので小数点以下丸め込みの上、メートルに変換して渡す
            self.y_unit = self.disctionLineUnit( self.size[1], min_ls_y, eval("%d" % speed) * 1000, (10000,100000) )
        else :
            if self.debug : print "mode elevation graph"
            # 高度はmなのでそのまま
            self.y_unit = self.disctionLineUnit( self.size[1], min_ls_y, speed, (10,100,1000,10000,100000) )
        
        str_int = 0
        px = 0
        unit_offset = 1000
        if self.y_unit['lunit'] == 'm' : unit_offset = 1
        while px < self.size[1] :
            y = eval("%d" % (max_xy[1] - px) )
            draw.line( [(org_xy[0],y),(max_xy[0],y)],fill=self.lcol)
            fontstr = "%s" % str_int
            if self.debug : print len(fontstr)
            for i in range(0,len(fontstr)) :
                offset = len(fontstr) - i - 1
                if self.debug : print "%s %d" % (fontstr,eval(fontstr[i]))
                f_org_x = org_xy[0] - 3 - offset*imsize[0] - imsize[0]
                f_org_y = y - imsize[1]
                box = ( f_org_x, f_org_y, f_org_x + imsize[0], f_org_y + imsize[1] )
                self.im.paste( nfont['num'][eval(fontstr[i])], box )
            str_int = str_int + (self.y_unit['ldigit'] * self.y_unit['lstep'])
            px      = str_int * self.y_unit['px_par_val'] * unit_offset

        # put unit string [km/h]
        f_org_x = org_xy[0] - imsize[0]*3 - 3
        f_org_y = org_xy[1] - 3 - (imsize[1]*2)
        if self.mode == "speed" :
            st_font = nfont['km_h']
        else :
            st_font = nfont[self.y_unit['lunit']]
        
        for i in range( 0,len( st_font ) ) :
            box = (f_org_x + (i*imsize[0]), f_org_y, f_org_x + (i*imsize[0]) + imsize[0], f_org_y+imsize[1] )
            self.im.paste( st_font[i], box )


    def drawLineGraph( self , x_list, y_list ) :

        draw  = ImageDraw.Draw(self.im)
        basebox = self.getGraphBaseSquare( 0 )
        
        org_xy = ( basebox[3][0], basebox[3][1] )
        max_xy = ( basebox[1][0], basebox[1][1] )
        
        # x = distance / y = speed
        # x = distance / y = ele
        dist_all = 0
        
        x_par = self.x_unit['px_par_val']
        y_par = self.y_unit['px_par_val']

        unit_offset = 1
        if self.mode == "speed" : unit_offset = 1000
        if self.debug : print "basebox : %s %s" % (org_xy, max_xy)
        if self.debug : print "1px/x(m)=%f 1px/y(m)=%f" % (x_par,y_par)
        for i in range( 1, len(x_list) ) :
            # print "len(x_list)=%d len(y_list)=%d x_list[%d]=%d y_list[%d]=%d" % (len(x_list),len(y_list),i,x_list[i],i,y_list[i])
            f_dist_1  = float(x_list[i-1])
            f_speed_1 = float(y_list[i-1])
            f_dist_2  = float(x_list[i])
            f_speed_2 = float(y_list[i])

            x_px1 = eval( "%d" % (x_par * dist_all) )
            y_px1 = eval( "%d" % (y_par * f_speed_1 * unit_offset) )

            x_px2 = eval( "%d" % (x_par * (dist_all+f_dist_2) ) )
            y_px2 = eval( "%d" % (y_par * f_speed_2 * unit_offset) )

            xy = [( org_xy[0] + x_px1, org_xy[1] - y_px1), (org_xy[0] + x_px2, org_xy[1] - y_px2)]
            #if self.debug : print "dist1=%d,dist2=%d" % (dist_all, dist_all+f_dist_2)
            #if self.debug : print "x=(%.3f->%.3f y=%.3f->%.3f (%d,%d -> %d,%d)" % (f_dist_1,f_dist_2,f_speed_1,f_speed_2,x_px1,y_px1,x_px2,y_px2)
            draw.line( xy, fill=self.gcol)
            dist_all += f_dist_2
        
    def end( self ):
        self.im.save( self.fname ,"PNG")

def process_file(file):

    gpx = GPXFileData( debug=1 )
    try:
        doc = FromXmlStream(file)
    except:
        print "[WARNING!] gpx file was broken [WARNING!]"
        return None
    
    gpx.makeLogData( doc )
    d = gpx.getLogData( )
    
    rundata = gpx.getRunningData( )
    
    rundata['dist'].insert(0, 0)
    if rundata.has_key( 'time' ) :
        rundata['speed'].insert(0, 0)
        gs = SpeedGraph( fname=d['name']+"_speed.png",mode="speed", debug=1)
        gs.makeGraphBase( rundata['dist_data']['total'], rundata['speed_data']['max'] )
        gs.drawLineGraph( rundata['dist'], rundata['speed'] )
        gs.end()
    
    if rundata.has_key( 'ele' ) :
        rundata['ele'].insert(0, 0)
        ge = SpeedGraph( fname=d['name']+"_ele.png",mode="ele", debug=1)
        ge.makeGraphBase( rundata['dist_data']['total'], rundata['ele_data']['max'] )
        ge.drawLineGraph( rundata['dist'], rundata['ele'] )
        ge.end()
    
    return gpx.getGoogleJSList()


# library test/debug function (dump given files)
if __name__ == '__main__':
    import sys
    import gc
    import time
    gc.enable()
    start_time = time.clock()
    start = len(gc.get_objects())
    print "%s object at the beginning" % start

    if len(sys.argv) < 2:
        print 'Usage: %s files...\n' % sys.argv[0]
        sys.exit(0)

    for filename in sys.argv[1:]:
        try:
            file=open(filename, 'r')
        except:
            print filename, 'unreadable'
            print
            continue
        print filename+':'
        data=process_file(file)
        if data:
            print len(data)
#            print data
        else :
            print 'No GPX information found'
        file.close()

    end_time = time.clock()
    duration = end_time - start_time
    end = len(gc.get_objects())
    print ""
    print "---------------------------------------------"
    print "%s object at the end" % end
    print "so %s object have been created for one photo" % (end-start)
    print "---------------------------------------------"
    print "Time needed : %s" % duration