finish hudcache rebuild code which speeds up bulk imports nicely - turn off permanently via allow_hudcache_rebuild in fpdb_import.py. Also some more moves into Database.py and cosmetic stuff

This commit is contained in:
sqlcoder 2009-07-21 22:26:23 +01:00
parent ab413faab9
commit f69281e2fd
5 changed files with 159 additions and 117 deletions

View File

@ -27,7 +27,12 @@ Create and manage the database objects.
import sys
import traceback
from datetime import datetime, date, time, timedelta
from time import time, strftime
import string
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE
#ISOLATION_LEVEL_AUTOCOMMIT = 0
#ISOLATION_LEVEL_READ_COMMITTED = 1
#ISOLATION_LEVEL_SERIALIZABLE = 2
# pyGTK modules
@ -39,6 +44,11 @@ import SQL
import Card
class Database:
MYSQL_INNODB = 2
PGSQL = 3
SQLITE = 4
def __init__(self, c, db_name = None, game = None, sql = None): # db_name and game not used any more
print "\ncreating Database instance, sql =", sql
self.fdb = fpdb_db.fpdb_db() # sets self.fdb.db self.fdb.cursor and self.fdb.sql
@ -55,7 +65,6 @@ class Database:
self.sql = SQL.Sql(type = self.type, db_server = db_params['db-server'])
else:
self.sql = sql
self.connection.rollback()
# To add to config:
self.hud_style = 'T' # A=All-time
@ -99,8 +108,11 @@ class Database:
#row = self.cursor.fetchone()
else:
print "Bailing on DB query, not sure it exists yet"
self.saveActions = False if self.import_options['saveActions'] == False else True
self.connection.rollback() # make sure any locks taken so far are released
# could be used by hud to change hud style
def set_hud_style(self, style):
self.hud_style = style
@ -114,6 +126,9 @@ class Database:
def rollback(self):
self.fdb.db.rollback()
def get_cursor(self):
return self.connection.cursor()
def close_connection(self):
self.connection.close()
@ -331,7 +346,7 @@ class Database:
,start_cashes, antes, card_values
,card_suits, winnings, rakes, seatNos)
if 'updateHudCache' not in settings or settings['updateHudCache'] != 'drop':
if 'dropHudCache' not in settings or settings['dropHudCache'] != 'drop':
fpdb_simple.storeHudCache(self.backend, cursor, base, category, gametype_id, hand_start_time, player_ids, hudImportData)
if self.saveActions:
@ -362,7 +377,7 @@ class Database:
, positions, card_values, card_suits, winnings, rakes, seatNos, hudImportData)
t4 = time()
#print "ring holdem, backend=%d" % backend
if 'updateHudCache' not in settings or settings['updateHudCache'] != 'drop':
if 'dropHudCache' not in settings or settings['dropHudCache'] != 'drop':
fpdb_simple.storeHudCache(self.backend, cursor, base, category, gametype_id, hand_start_time, player_ids, hudImportData)
t5 = time()
t6 = time()
@ -396,7 +411,7 @@ class Database:
, card_values, card_suits, winnings, rakes, seatNos, tourneys_players_ids)
#print "tourney holdem, backend=%d" % backend
if 'updateHudCache' not in settings or settings['updateHudCache'] != 'drop':
if 'dropHudCache' not in settings or settings['dropHudCache'] != 'drop':
fpdb_simple.storeHudCache(self.backend, cursor, base, category, gametype_id, hand_start_time, player_ids, hudImportData)
if self.saveActions:
@ -423,7 +438,7 @@ class Database:
, playerIds, startCashes, antes, cardValues, cardSuits
, winnings, rakes, seatNos, tourneys_players_ids)
if 'updateHudCache' not in settings or settings['updateHudCache'] != 'drop':
if 'dropHudCache' not in settings or settings['dropHudCache'] != 'drop':
fpdb_simple.storeHudCache(self.backend, cursor, base, category, gametypeId, hand_start_time, playerIds, hudImportData)
if self.saveActions:
@ -431,6 +446,38 @@ class Database:
return hands_id
#end def tourney_stud
def rebuild_hudcache(self):
"""clears hudcache and rebuilds from the individual handsplayers records"""
stime = time()
self.connection.cursor().execute(self.sql.query['clearHudCache'])
self.connection.cursor().execute(self.sql.query['rebuildHudCache'])
self.commit()
print "Rebuild hudcache took %.1f seconds" % (time() - stime,)
#end def rebuild_hudcache
def analyzeDB(self):
"""Do whatever the DB can offer to update index/table statistics"""
stime = time()
if self.backend == self.MYSQL_INNODB:
try:
self.cursor.execute(self.sql.query['analyze'])
except:
print "Error during analyze"
elif self.backend == self.PGSQL:
self.connection.set_isolation_level(0) # allow vacuum to work
try:
self.cursor = self.get_cursor()
self.cursor.execute(self.sql.query['analyze'])
except:
print "Error during analyze:", str(sys.exc_value)
self.connection.set_isolation_level(1) # go back to normal isolation level
self.commit()
atime = time() - stime
print "Analyze took %.1f seconds" % (atime,)
#end def analyzeDB
if __name__=="__main__":
c = Configuration.Config()

View File

@ -205,10 +205,11 @@ class GuiBulkImport():
self.load_button.show()
# see how many hands are in the db and adjust accordingly
tcursor = self.importer.fdb.db.cursor()
tcursor = self.importer.database.cursor
tcursor.execute("Select count(1) from Hands")
row = tcursor.fetchone()
tcursor.close()
self.importer.database.rollback()
self.n_hands_in_db = row[0]
if self.n_hands_in_db == 0:
self.cb_dropindexes.set_active(2)

View File

@ -1344,58 +1344,58 @@ class Sql:
,count(1)
,sum(wonWhenSeenStreet1)
,sum(wonAtSD)
,sum(CAST(street0VPI as integer))
,sum(CAST(street0Aggr as integer))
,sum(CAST(street0_3BChance as integer))
,sum(CAST(street0_3BDone as integer))
,sum(CAST(street1Seen as integer))
,sum(CAST(street2Seen as integer))
,sum(CAST(street3Seen as integer))
,sum(CAST(street4Seen as integer))
,sum(CAST(sawShowdown as integer))
,sum(CAST(street1Aggr as integer))
,sum(CAST(street2Aggr as integer))
,sum(CAST(street3Aggr as integer))
,sum(CAST(street4Aggr as integer))
,sum(CAST(otherRaisedStreet1 as integer))
,sum(CAST(otherRaisedStreet2 as integer))
,sum(CAST(otherRaisedStreet3 as integer))
,sum(CAST(otherRaisedStreet4 as integer))
,sum(CAST(foldToOtherRaisedStreet1 as integer))
,sum(CAST(foldToOtherRaisedStreet2 as integer))
,sum(CAST(foldToOtherRaisedStreet3 as integer))
,sum(CAST(foldToOtherRaisedStreet4 as integer))
,sum(CAST(stealAttemptChance as integer))
,sum(CAST(stealAttempted as integer))
,sum(CAST(foldBbToStealChance as integer))
,sum(CAST(foldedBbToSteal as integer))
,sum(CAST(foldSbToStealChance as integer))
,sum(CAST(foldedSbToSteal as integer))
,sum(CAST(street1CBChance as integer))
,sum(CAST(street1CBDone as integer))
,sum(CAST(street2CBChance as integer))
,sum(CAST(street2CBDone as integer))
,sum(CAST(street3CBChance as integer))
,sum(CAST(street3CBDone as integer))
,sum(CAST(street4CBChance as integer))
,sum(CAST(street4CBDone as integer))
,sum(CAST(foldToStreet1CBChance as integer))
,sum(CAST(foldToStreet1CBDone as integer))
,sum(CAST(foldToStreet2CBChance as integer))
,sum(CAST(foldToStreet2CBDone as integer))
,sum(CAST(foldToStreet3CBChance as integer))
,sum(CAST(foldToStreet3CBDone as integer))
,sum(CAST(foldToStreet4CBChance as integer))
,sum(CAST(foldToStreet4CBDone as integer))
,sum(CAST(totalProfit as integer))
,sum(CAST(street1CheckCallRaiseChance as integer))
,sum(CAST(street1CheckCallRaiseDone as integer))
,sum(CAST(street2CheckCallRaiseChance as integer))
,sum(CAST(street2CheckCallRaiseDone as integer))
,sum(CAST(street3CheckCallRaiseChance as integer))
,sum(CAST(street3CheckCallRaiseDone as integer))
,sum(CAST(street4CheckCallRaiseChance as integer))
,sum(CAST(street4CheckCallRaiseDone as integer))
,sum(street0VPI)
,sum(street0Aggr)
,sum(street0_3BChance)
,sum(street0_3BDone)
,sum(street1Seen)
,sum(street2Seen)
,sum(street3Seen)
,sum(street4Seen)
,sum(sawShowdown)
,sum(street1Aggr)
,sum(street2Aggr)
,sum(street3Aggr)
,sum(street4Aggr)
,sum(otherRaisedStreet1)
,sum(otherRaisedStreet2)
,sum(otherRaisedStreet3)
,sum(otherRaisedStreet4)
,sum(foldToOtherRaisedStreet1)
,sum(foldToOtherRaisedStreet2)
,sum(foldToOtherRaisedStreet3)
,sum(foldToOtherRaisedStreet4)
,sum(stealAttemptChance)
,sum(stealAttempted)
,sum(foldBbToStealChance)
,sum(foldedBbToSteal)
,sum(foldSbToStealChance)
,sum(foldedSbToSteal)
,sum(street1CBChance)
,sum(street1CBDone)
,sum(street2CBChance)
,sum(street2CBDone)
,sum(street3CBChance)
,sum(street3CBDone)
,sum(street4CBChance)
,sum(street4CBDone)
,sum(foldToStreet1CBChance)
,sum(foldToStreet1CBDone)
,sum(foldToStreet2CBChance)
,sum(foldToStreet2CBDone)
,sum(foldToStreet3CBChance)
,sum(foldToStreet3CBDone)
,sum(foldToStreet4CBChance)
,sum(foldToStreet4CBDone)
,sum(totalProfit)
,sum(street1CheckCallRaiseChance)
,sum(street1CheckCallRaiseDone)
,sum(street2CheckCallRaiseChance)
,sum(street2CheckCallRaiseDone)
,sum(street3CheckCallRaiseChance)
,sum(street3CheckCallRaiseDone)
,sum(street4CheckCallRaiseChance)
,sum(street4CheckCallRaiseDone)
FROM HandsPlayers hp
INNER JOIN Hands h ON (h.id = hp.handId)
GROUP BY h.gametypeId
@ -1554,6 +1554,14 @@ class Sql:
,to_char(h.handStart, 'YYMMDD')
"""
if db_server == 'mysql':
self.query['analyze'] = """
analyze table autorates, gametypes, hands, handsplayers, hudcache, players
, settings, sites, tourneys, tourneysplayers, tourneytypes
"""
else: # assume postgres
self.query['analyze'] = "vacuum analyze"
if __name__== "__main__":
# just print the default queries and exit
s = Sql(game = 'razz', type = 'ptracks')

View File

@ -572,52 +572,6 @@ class fpdb_db:
self.db.set_isolation_level(1) # go back to normal isolation level
#end def dropAllIndexes
def analyzeDB(self):
"""Do whatever the DB can offer to update index/table statistics"""
stime = time()
if self.backend == self.PGSQL:
self.db.set_isolation_level(0) # allow vacuum to work
try:
self.cursor.execute("vacuum analyze")
except:
print "Error during vacuum"
self.db.set_isolation_level(1) # go back to normal isolation level
self.db.commit()
atime = time() - stime
print "analyze took", atime, "seconds"
#end def analyzeDB
# Currently uses an exclusive lock on the Players table as a global lock
# ( Changed because Hands is used in Database.init() )
# Return values are Unix style, 0 for success, positive integers for errors
# 1 = generic error
# 2 = players table does not exist (error message is suppressed)
def get_global_lock(self):
if self.backend == self.MYSQL_INNODB:
try:
self.cursor.execute( "lock tables Players write" )
except:
# Table 'fpdb.players' doesn't exist
if str(sys.exc_value).find(".Players' doesn't exist") >= 0:
return(2)
print "Error! failed to obtain global lock. Close all programs accessing " \
+ "database (including fpdb) and try again (%s)." \
% ( str(sys.exc_value).rstrip('\n'), )
return(1)
elif self.backend == self.PGSQL:
try:
self.cursor.execute( "lock table Players in exclusive mode nowait" )
#print "... after lock table, status =", self.cursor.statusmessage
except:
# relation "players" does not exist
if str(sys.exc_value).find('relation "players" does not exist') >= 0:
return(2)
print "Error! failed to obtain global lock. Close all programs accessing " \
+ "database (including fpdb) and try again (%s)." \
% ( str(sys.exc_value).rstrip('\n'), )
return(1)
return(0)
def getLastInsertId(self):
if self.backend == self.MYSQL_INNODB:
ret = self.db.insert_id()

View File

@ -81,9 +81,10 @@ class Importer:
self.database = Database.Database(self.config) # includes .connection and .sql variables
self.fdb = fpdb_db.fpdb_db() # sets self.fdb.db self.fdb.cursor and self.fdb.sql
self.fdb.do_connect(self.config)
self.fdb.db.rollback()
self.fdb.db.rollback() # make sure all locks are released
self.NEWIMPORT = False
self.allow_hudcache_rebuild = True;
#Set functions
def setCallHud(self, value):
@ -168,13 +169,19 @@ class Importer:
def runImport(self):
""""Run full import on self.filelist."""
start = datetime.datetime.now()
print "started at", start, "--", len(self.filelist), "files to import.", self.settings['dropIndexes']
print "Started at", start, "--", len(self.filelist), "files to import.", self.settings['dropIndexes']
if self.settings['dropIndexes'] == 'auto':
self.settings['dropIndexes'] = self.calculate_auto2(10.0, 500.0)
self.settings['dropIndexes'] = self.calculate_auto2(12.0, 500.0)
if self.allow_hudcache_rebuild:
self.settings['dropHudCache'] = self.calculate_auto2(25.0, 500.0) # returns "drop"/"don't drop"
if self.settings['dropIndexes'] == 'drop':
self.fdb.prepareBulkImport()
#self.settings['updateHudCache'] = self.calculate_auto2(10.0, 500.0)
else:
print "No need drop indexes."
#print "dropInd =", self.settings['dropIndexes'], " dropHudCache =", self.settings['dropHudCache']
totstored = 0
totdups = 0
totpartial = 0
@ -190,7 +197,13 @@ class Importer:
tottime += ttime
if self.settings['dropIndexes'] == 'drop':
self.fdb.afterBulkImport()
self.fdb.analyzeDB()
else:
print "No need rebuild indexes."
if self.settings['dropHudCache'] == 'drop':
self.database.rebuild_hudcache()
else:
print "No need to rebuild hudcache."
self.database.analyzeDB()
return (totstored, totdups, totpartial, toterrors, tottime)
# else: import threaded
@ -237,11 +250,12 @@ class Importer:
# if hands_in_db is zero or very low, we want to drop indexes, otherwise compare
# import size with db size somehow:
#print "auto2: handsindb =", self.settings['handsInDB'], "total_size =", total_size, "size_per_hand =", \
# size_per_hand, "inc =", increment
ret = "don't drop"
if self.settings['handsInDB'] < scale * (total_size/size_per_hand) + increment:
return "drop"
return "don't drop"
ret = "drop"
#print "auto2: handsindb =", self.settings['handsInDB'], "total_size =", total_size, "size_per_hand =", \
# size_per_hand, "inc =", increment, "return:", ret
return ret
#Run import on updated files, then store latest update time.
def runUpdated(self):
@ -309,6 +323,24 @@ class Importer:
filter_name = filter.replace("ToFpdb", "")
# Example code for using threads & queues: (maybe for obj and import_fpdb_file??)
#def worker():
# while True:
# item = q.get()
# do_work(item)
# q.task_done()
#
#q = Queue()
#for i in range(num_worker_threads):
# t = Thread(target=worker)
# t.setDaemon(True)
# t.start()
#
#for item in source():
# q.put(item)
#
#q.join() # block until all tasks are done
mod = __import__(filter)
obj = getattr(mod, filter_name, None)
if callable(obj):
@ -317,12 +349,12 @@ class Importer:
(stored, duplicates, partial, errors, ttime) = self.import_fpdb_file(out_path, site)
elif (conv.getStatus() and self.NEWIMPORT == True):
#This code doesn't do anything yet
handlist = conv.getProcessedHands()
self.pos_in_file[file] = conv.getLastCharacterRead()
handlist = hhc.getProcessedHands()
self.pos_in_file[file] = hhc.getLastCharacterRead()
for hand in handlist:
hand.prepInsert(self.fdb)
hand.insert(self.fdb)
hand.prepInsert()
hand.insert()
else:
# conversion didn't work
# TODO: appropriate response?