add db writer threading

This commit is contained in:
sqlcoder 2009-07-31 21:24:21 +01:00
parent 36de79140c
commit b503626c2b
3 changed files with 310 additions and 201 deletions

View File

@ -31,6 +31,7 @@ from time import time, strftime
import string import string
import re import re
import logging import logging
import Queue
# pyGTK modules # pyGTK modules
@ -469,23 +470,19 @@ class Database:
,action_types, allIns, action_amounts, actionNos, hudImportData, maxSeats, tableName ,action_types, allIns, action_amounts, actionNos, hudImportData, maxSeats, tableName
,seatNos): ,seatNos):
try: fpdb_simple.fillCardArrays(len(names), base, category, card_values, card_suits)
fpdb_simple.fillCardArrays(len(names), base, category, card_values, card_suits)
hands_id = self.storeHands(self.backend, site_hand_no, gametype_id hands_id = self.storeHands(self.backend, site_hand_no, gametype_id
,hand_start_time, names, tableName, maxSeats, hudImportData ,hand_start_time, names, tableName, maxSeats, hudImportData
,(None, None, None, None, None), (None, None, None, None, None)) ,(None, None, None, None, None), (None, None, None, None, None))
#print "before calling store_hands_players_stud, antes:", antes #print "before calling store_hands_players_stud, antes:", antes
hands_players_ids = self.store_hands_players_stud(self.backend, hands_id, player_ids hands_players_ids = self.store_hands_players_stud(self.backend, hands_id, player_ids
,start_cashes, antes, card_values ,start_cashes, antes, card_values
,card_suits, winnings, rakes, seatNos) ,card_suits, winnings, rakes, seatNos)
if 'dropHudCache' not in settings or settings['dropHudCache'] != 'drop': if 'dropHudCache' not in settings or settings['dropHudCache'] != 'drop':
self.storeHudCache(self.backend, base, category, gametype_id, hand_start_time, player_ids, hudImportData) self.storeHudCache(self.backend, base, category, gametype_id, hand_start_time, player_ids, hudImportData)
except:
print "ring_stud error: " + str(sys.exc_value) # in case exception doesn't get printed
raise fpdb_simple.FpdbError("ring_stud error: " + str(sys.exc_value))
return hands_id return hands_id
#end def ring_stud #end def ring_stud
@ -496,30 +493,26 @@ class Database:
,action_amounts, actionNos, hudImportData, maxSeats, tableName, seatNos): ,action_amounts, actionNos, hudImportData, maxSeats, tableName, seatNos):
"""stores a holdem/omaha hand into the database""" """stores a holdem/omaha hand into the database"""
try: t0 = time()
t0 = time() #print "in ring_holdem_omaha"
#print "in ring_holdem_omaha" fpdb_simple.fillCardArrays(len(names), base, category, card_values, card_suits)
fpdb_simple.fillCardArrays(len(names), base, category, card_values, card_suits) t1 = time()
t1 = time() fpdb_simple.fill_board_cards(board_values, board_suits)
fpdb_simple.fill_board_cards(board_values, board_suits) t2 = time()
t2 = time()
hands_id = self.storeHands(self.backend, site_hand_no, gametype_id hands_id = self.storeHands(self.backend, site_hand_no, gametype_id
,hand_start_time, names, tableName, maxSeats ,hand_start_time, names, tableName, maxSeats
,hudImportData, board_values, board_suits) ,hudImportData, board_values, board_suits)
#TEMPORARY CALL! - Just until all functions are migrated #TEMPORARY CALL! - Just until all functions are migrated
t3 = time() t3 = time()
hands_players_ids = self.store_hands_players_holdem_omaha( hands_players_ids = self.store_hands_players_holdem_omaha(
self.backend, category, hands_id, player_ids, start_cashes self.backend, category, hands_id, player_ids, start_cashes
, positions, card_values, card_suits, winnings, rakes, seatNos, hudImportData) , positions, card_values, card_suits, winnings, rakes, seatNos, hudImportData)
t4 = time() t4 = time()
if 'dropHudCache' not in settings or settings['dropHudCache'] != 'drop': if 'dropHudCache' not in settings or settings['dropHudCache'] != 'drop':
self.storeHudCache(self.backend, base, category, gametype_id, hand_start_time, player_ids, hudImportData) self.storeHudCache(self.backend, base, category, gametype_id, hand_start_time, player_ids, hudImportData)
t5 = time() t5 = time()
#print "fills=(%4.3f) saves=(%4.3f,%4.3f,%4.3f)" % (t2-t0, t3-t2, t4-t3, t5-t4) #print "fills=(%4.3f) saves=(%4.3f,%4.3f,%4.3f)" % (t2-t0, t3-t2, t4-t3, t5-t4)
except:
print "ring_holdem_omaha error: " + str(sys.exc_value) # in case exception doesn't get printed
raise fpdb_simple.FpdbError("ring_holdem_omaha error: " + str(sys.exc_value))
return hands_id return hands_id
#end def ring_holdem_omaha #end def ring_holdem_omaha
@ -532,28 +525,24 @@ class Database:
,actionNos, hudImportData, maxSeats, tableName, seatNos): ,actionNos, hudImportData, maxSeats, tableName, seatNos):
"""stores a tourney holdem/omaha hand into the database""" """stores a tourney holdem/omaha hand into the database"""
try: fpdb_simple.fillCardArrays(len(names), base, category, card_values, card_suits)
fpdb_simple.fillCardArrays(len(names), base, category, card_values, card_suits) fpdb_simple.fill_board_cards(board_values, board_suits)
fpdb_simple.fill_board_cards(board_values, board_suits)
tourney_id = self.store_tourneys(tourneyTypeId, siteTourneyNo, entries, prizepool, tourney_start) tourney_id = self.store_tourneys(tourneyTypeId, siteTourneyNo, entries, prizepool, tourney_start)
tourneys_players_ids = self.store_tourneys_players(tourney_id, player_ids, payin_amounts, ranks, winnings) tourneys_players_ids = self.store_tourneys_players(tourney_id, player_ids, payin_amounts, ranks, winnings)
hands_id = self.storeHands(self.backend, site_hand_no, gametype_id hands_id = self.storeHands(self.backend, site_hand_no, gametype_id
,hand_start_time, names, tableName, maxSeats ,hand_start_time, names, tableName, maxSeats
,hudImportData, board_values, board_suits) ,hudImportData, board_values, board_suits)
hands_players_ids = self.store_hands_players_holdem_omaha_tourney( hands_players_ids = self.store_hands_players_holdem_omaha_tourney(
self.backend, category, hands_id, player_ids, start_cashes, positions self.backend, category, hands_id, player_ids, start_cashes, positions
, card_values, card_suits, winnings, rakes, seatNos, tourneys_players_ids , card_values, card_suits, winnings, rakes, seatNos, tourneys_players_ids
, hudImportData) , hudImportData)
#print "tourney holdem, backend=%d" % backend #print "tourney holdem, backend=%d" % backend
if 'dropHudCache' not in settings or settings['dropHudCache'] != 'drop': if 'dropHudCache' not in settings or settings['dropHudCache'] != 'drop':
self.storeHudCache(self.backend, base, category, gametype_id, hand_start_time, player_ids, hudImportData) self.storeHudCache(self.backend, base, category, gametype_id, hand_start_time, player_ids, hudImportData)
except:
print "tourney_holdem_omaha error: " + str(sys.exc_value) # in case exception doesn't get printed
raise fpdb_simple.FpdbError("tourney_holdem_omaha error: " + str(sys.exc_value))
return hands_id return hands_id
#end def tourney_holdem_omaha #end def tourney_holdem_omaha
@ -565,26 +554,22 @@ class Database:
,actionNos, hudImportData, maxSeats, tableName, seatNos): ,actionNos, hudImportData, maxSeats, tableName, seatNos):
#stores a tourney stud/razz hand into the database #stores a tourney stud/razz hand into the database
try: fpdb_simple.fillCardArrays(len(names), base, category, cardValues, cardSuits)
fpdb_simple.fillCardArrays(len(names), base, category, cardValues, cardSuits)
tourney_id = self.store_tourneys(tourneyTypeId, siteTourneyNo, entries, prizepool, tourneyStartTime) tourney_id = self.store_tourneys(tourneyTypeId, siteTourneyNo, entries, prizepool, tourneyStartTime)
tourneys_players_ids = self.store_tourneys_players(tourney_id, playerIds, payin_amounts, ranks, winnings) tourneys_players_ids = self.store_tourneys_players(tourney_id, playerIds, payin_amounts, ranks, winnings)
hands_id = self.storeHands( self.backend, siteHandNo, gametypeId hands_id = self.storeHands( self.backend, siteHandNo, gametypeId
, handStartTime, names, tableName, maxSeats , handStartTime, names, tableName, maxSeats
, hudImportData, board_values, board_suits ) , hudImportData, board_values, board_suits )
hands_players_ids = self.store_hands_players_stud_tourney(self.backend, hands_id hands_players_ids = self.store_hands_players_stud_tourney(self.backend, hands_id
, playerIds, startCashes, antes, cardValues, cardSuits , playerIds, startCashes, antes, cardValues, cardSuits
, winnings, rakes, seatNos, tourneys_players_ids) , winnings, rakes, seatNos, tourneys_players_ids)
if 'dropHudCache' not in settings or settings['dropHudCache'] != 'drop': if 'dropHudCache' not in settings or settings['dropHudCache'] != 'drop':
self.storeHudCache(self.backend, base, category, gametypeId, hand_start_time, playerIds, hudImportData) self.storeHudCache(self.backend, base, category, gametypeId, hand_start_time, playerIds, hudImportData)
except:
print "tourney_stud error: " + str(sys.exc_value) # in case exception doesn't get printed
raise fpdb_simple.FpdbError("tourney_stud error: " + str(sys.exc_value))
return hands_id return hands_id
#end def tourney_stud #end def tourney_stud
@ -613,7 +598,7 @@ class Database:
"AND referenced_column_name = %s ", "AND referenced_column_name = %s ",
(fk['fktab'], fk['fkcol'], fk['rtab'], fk['rcol']) ) (fk['fktab'], fk['fkcol'], fk['rtab'], fk['rcol']) )
cons = c.fetchone() cons = c.fetchone()
print "preparebulk find fk: cons=", cons #print "preparebulk find fk: cons=", cons
if cons: if cons:
print "dropping mysql fk", cons[0], fk['fktab'], fk['fkcol'] print "dropping mysql fk", cons[0], fk['fktab'], fk['fkcol']
try: try:
@ -994,7 +979,7 @@ class Database:
result = self.tourney_holdem_omaha( result = self.tourney_holdem_omaha(
h.config, h.settings, h.base, h.category, h.siteTourneyNo, h.buyin h.config, h.settings, h.base, h.category, h.siteTourneyNo, h.buyin
, h.fee, h.knockout, h.entries, h.prizepool, h.tourneyStartTime , h.fee, h.knockout, h.entries, h.prizepool, h.tourneyStartTime
, h.payin_amounts, h.ranks, h.tourneyTypeId, h.siteID, h.siteHandNo , payin_amounts, ranks, h.tourneyTypeId, h.siteID, h.siteHandNo
, h.gametypeID, h.handStartTime, h.names, h.playerIDs, h.startCashes , h.gametypeID, h.handStartTime, h.names, h.playerIDs, h.startCashes
, h.positions, h.cardValues, h.cardSuits, h.boardValues, h.boardSuits , h.positions, h.cardValues, h.cardSuits, h.boardValues, h.boardSuits
, h.winnings, h.rakes, h.actionTypes, h.allIns, h.actionAmounts , h.winnings, h.rakes, h.actionTypes, h.allIns, h.actionAmounts
@ -1003,13 +988,13 @@ class Database:
result = self.tourney_stud( result = self.tourney_stud(
h.config, h.settings, h.base, h.category, h.siteTourneyNo h.config, h.settings, h.base, h.category, h.siteTourneyNo
, h.buyin, h.fee, h.knockout, h.entries, h.prizepool, h.tourneyStartTime , h.buyin, h.fee, h.knockout, h.entries, h.prizepool, h.tourneyStartTime
, h.payin_amounts, h.ranks, h.tourneyTypeId, h.siteID, h.siteHandNo , payin_amounts, ranks, h.tourneyTypeId, h.siteID, h.siteHandNo
, h.gametypeID, h.handStartTime, h.names, h.playerIDs, h.startCashes , h.gametypeID, h.handStartTime, h.names, h.playerIDs, h.startCashes
, h.antes, h.cardValues, h.cardSuits, h.winnings, h.rakes, h.actionTypes , h.antes, h.cardValues, h.cardSuits, h.winnings, h.rakes, h.actionTypes
, h.allIns, h.actionAmounts, h.actionNos, h.hudImportData, h.maxSeats , h.allIns, h.actionAmounts, h.actionNos, h.hudImportData, h.maxSeats
, h.tableName, h.seatNos) , h.tableName, h.seatNos)
else: else:
raise fpself.simple.Fpself.rror("unrecognised category") raise fpdb_simple.FpdbError("unrecognised category")
else: else:
if h.base == "hold": if h.base == "hold":
result = self.ring_holdem_omaha( result = self.ring_holdem_omaha(
@ -1027,11 +1012,13 @@ class Database:
, h.actionAmounts, h.actionNos, h.hudImportData, h.maxSeats, h.tableName , h.actionAmounts, h.actionNos, h.hudImportData, h.maxSeats, h.tableName
, h.seatNos) , h.seatNos)
else: else:
raise fpself.simple.Fpself.rror ("unrecognised category") raise fpdb_simple.FpdbError("unrecognised category")
self.commit()
except: except:
print "Error storing hand: " + str(sys.exc_value) print "Error storing hand: " + str(sys.exc_value)
self.rollback() self.rollback()
# re-raise the exception so that the calling routine can decide what to do:
# (e.g. a write thread might try again)
raise
return result return result
#end def store_the_hand #end def store_the_hand
@ -1576,8 +1563,85 @@ class Database:
#end def store_tourneys_players #end def store_tourneys_players
# read HandToWrite objects from q and insert into database
def insert_queue_hands(self, q, maxwait=10, commitEachHand=True):
n,fails,maxTries,firstWait = 0,0,4,0.1
sendFinal = False
t0 = time()
while True:
try:
h = q.get(True) # (True,maxWait) has probs if 1st part of import is all dups
except Queue.Empty:
# Queue.Empty exception thrown if q was empty for
# if q.empty() also possible - no point if testing for Queue.Empty exception
# maybe increment a counter and only break after a few times?
# could also test threading.active_count() or look through threading.enumerate()
# so break immediately if no threads, but count up to X exceptions if a writer
# thread is still alive???
print "queue empty too long - writer stopping ..."
break
except:
print "writer stopping, error reading queue: " + str(sys.exc_info())
break
#print "got hand", str(h.get_finished())
tries,wait,again = 0,firstWait,True
while again:
try:
again = False # set this immediately to avoid infinite loops!
if h.get_finished():
# all items on queue processed
sendFinal = True
else:
self.store_the_hand(h)
# optional commit, could be every hand / every N hands / every time a
# commit message received?? mark flag to indicate if commits outstanding
if commitEachHand:
self.commit()
n = n + 1
except:
#print "iqh store error", sys.exc_value # debug
self.rollback()
if re.search('deadlock', str(sys.exc_info()[1]), re.I):
# deadlocks only a problem if hudcache is being updated
tries = tries + 1
if tries < maxTries and wait < 5: # wait < 5 just to make sure
print "deadlock detected - trying again ..."
time.sleep(wait)
wait = wait + wait
again = True
else:
print "too many deadlocks - failed to store hand " + h.get_siteHandNo()
if not again:
fails = fails + 1
err = traceback.extract_tb(sys.exc_info()[2])[-1]
print "***Error storing hand: "+err[2]+"("+str(err[1])+"): "+str(sys.exc_info()[1])
# finished trying to store hand
# always reduce q count, whether or not this hand was saved ok
q.task_done()
# while True loop
self.commit()
if sendFinal:
q.task_done()
print "db writer finished: stored %d hands (%d fails) in %.1f seconds" % (n, fails, time()-t0)
# end def insert_queue_hands():
def send_finish_msg(self, q):
try:
h = HandToWrite(True)
q.put(h)
except:
err = traceback.extract_tb(sys.exc_info()[2])[-1]
print "***Error sending finish: "+err[2]+"("+str(err[1])+"): "+str(sys.exc_info()[1])
# end def send_finish_msg():
# Class used to hold all the data needed to write a hand to the db # Class used to hold all the data needed to write a hand to the db
# mainParser() in fpdb_parse_logic.py creates one of these and then passes it to # mainParser() in fpdb_parse_logic.py creates one of these and then passes it to
# self.insert_queue_hands()
class HandToWrite: class HandToWrite:
@ -1675,49 +1739,6 @@ class HandToWrite:
raise raise
# end def set_hand # end def set_hand
def set_ring_holdem_omaha( self, config, settings, base, category, siteHandNo
, gametypeID, handStartTime, names, playerIDs
, startCashes, positions, cardValues, cardSuits
, boardValues, boardSuits, winnings, rakes
, actionTypes, allIns, actionAmounts, actionNos
, hudImportData, maxSeats, tableName, seatNos ):
self.config = config
self.settings = settings
self.base = base
self.category = category
self.siteHandNo = siteHandNo
self.gametypeID = gametypeID
self.handStartTime = handStartTime
self.names = names
self.playerIDs = playerIDs
self.startCashes = startCashes
self.positions = positions
self.cardValues = cardValues
self.cardSuits = cardSuits
self.boardValues = boardValues
self.boardSuits = boardSuits
self.winnings = winnings
self.rakes = rakes
self.actionTypes = actionTypes
self.allIns = allIns
self.actionAmounts = actionAmounts
self.actionNos = actionNos
self.hudImportData = hudImportData
self.maxSeats = maxSeats
self.tableName = tableName
self.seatNos = seatNos
# end def set_ring_holdem_omaha
def send_ring_holdem_omaha(self, db):
result = db.ring_holdem_omaha(
self.config, self.settings, self.base, self.category, self.siteHandNo
, self.gametypeID, self.handStartTime, self.names, self.playerIDs
, self.startCashes, self.positions, self.cardValues, self.cardSuits
, self.boardValues, self.boardSuits, self.winnings, self.rakes
, self.actionTypes, self.allIns, self.actionAmounts, self.actionNos
, self.hudImportData, self.maxSeats, self.tableName, self.seatNos)
# end def send_ring_holdem_omaha
def get_finished(self): def get_finished(self):
return( self.finished ) return( self.finished )
# end def get_finished # end def get_finished

View File

@ -21,12 +21,15 @@
import os # todo: remove this once import_dir is in fpdb_import import os # todo: remove this once import_dir is in fpdb_import
import sys import sys
from time import time, strftime from time import time, strftime, sleep
import logging import logging
import traceback import traceback
import math import math
import datetime import datetime
import re import re
import Queue
from collections import deque # using Queue for now
import threading
# fpdb/FreePokerTools modules # fpdb/FreePokerTools modules
@ -42,11 +45,11 @@ try:
mysqlLibFound=True mysqlLibFound=True
except: except:
pass pass
try: try:
import psycopg2 import psycopg2
pgsqlLibFound=True pgsqlLibFound=True
import psycopg2.extensions import psycopg2.extensions
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
except: except:
@ -61,7 +64,6 @@ class Importer:
self.config = config self.config = config
self.sql = sql self.sql = sql
self.database = None # database will be the main db interface eventually
self.filelist = {} self.filelist = {}
self.dirlist = {} self.dirlist = {}
self.siteIds = {} self.siteIds = {}
@ -74,14 +76,24 @@ class Importer:
self.pos_in_file = {} # dict to remember how far we have read in the file self.pos_in_file = {} # dict to remember how far we have read in the file
#Set defaults #Set defaults
self.callHud = self.config.get_import_parameters().get("callFpdbHud") self.callHud = self.config.get_import_parameters().get("callFpdbHud")
# CONFIGURATION OPTIONS - update allowHudcacheRebuild and forceThreads for faster imports
self.settings.setdefault("minPrint", 30) self.settings.setdefault("minPrint", 30)
self.settings.setdefault("handCount", 0) self.settings.setdefault("handCount", 0)
self.settings.setdefault("allowHudcacheRebuild", False) # if True speeds up big imports a lot, also
self.database = Database.Database(self.config, sql = self.sql) # includes .connection and .sql variables # stops deadlock problems with threaded imports
self.settings.setdefault("forceThreads", 0) # set to 1/2/more for faster imports
self.settings.setdefault("writeQSize", 1000) # no need to change
self.settings.setdefault("writeQMaxWait", 10) # not used
self.writeq = None
self.database = Database.Database(self.config, sql = self.sql)
self.writerdbs = []
self.settings.setdefault("threads", 1) # value overridden by GuiBulkImport - use forceThreads above
for i in xrange(self.settings['threads']):
self.writerdbs.append( Database.Database(self.config, sql = self.sql) )
self.NEWIMPORT = False self.NEWIMPORT = False
self.allow_hudcache_rebuild = False
#Set functions #Set functions
def setCallHud(self, value): def setCallHud(self, value):
@ -104,6 +116,9 @@ class Importer:
def setThreads(self, value): def setThreads(self, value):
self.settings['threads'] = value self.settings['threads'] = value
if self.settings["threads"] > len(self.writerdbs):
for i in xrange(self.settings['threads'] - len(self.writerdbs)):
self.writerdbs.append( Database.Database(self.config, sql = self.sql) )
def setDropIndexes(self, value): def setDropIndexes(self, value):
self.settings['dropIndexes'] = value self.settings['dropIndexes'] = value
@ -114,6 +129,11 @@ class Importer:
def clearFileList(self): def clearFileList(self):
self.filelist = {} self.filelist = {}
def closeDBs(self):
self.database.disconnect()
for i in xrange(len(self.writerdbs)):
self.writerdbs[i].disconnect()
#Add an individual file to filelist #Add an individual file to filelist
def addImportFile(self, filename, site = "default", filter = "passthrough"): def addImportFile(self, filename, site = "default", filter = "passthrough"):
#TODO: test it is a valid file -> put that in config!! #TODO: test it is a valid file -> put that in config!!
@ -164,61 +184,109 @@ class Importer:
print "Warning: Attempted to add non-directory: '" + str(dir) + "' as an import directory" print "Warning: Attempted to add non-directory: '" + str(dir) + "' as an import directory"
def runImport(self): def runImport(self):
""""Run full import on self.filelist.""" """"Run full import on self.filelist. This is called from GuiBulkImport.py"""
if self.settings['forceThreads'] > 0: # use forceThreads until threading enabled in GuiBulkImport
self.setThreads(self.settings['forceThreads'])
# Initial setup
start = datetime.datetime.now() start = datetime.datetime.now()
starttime = time()
print "Started at", start, "--", len(self.filelist), "files to import.", self.settings['dropIndexes'] print "Started at", start, "--", len(self.filelist), "files to import.", self.settings['dropIndexes']
if self.settings['dropIndexes'] == 'auto': if self.settings['dropIndexes'] == 'auto':
self.settings['dropIndexes'] = self.calculate_auto2(12.0, 500.0) self.settings['dropIndexes'] = self.calculate_auto2(self.database, 12.0, 500.0)
if self.allow_hudcache_rebuild: if self.settings['allowHudcacheRebuild']:
self.settings['dropHudCache'] = self.calculate_auto2(25.0, 500.0) # returns "drop"/"don't drop" self.settings['dropHudCache'] = self.calculate_auto2(self.database, 25.0, 500.0) # returns "drop"/"don't drop"
if self.settings['dropIndexes'] == 'drop': if self.settings['dropIndexes'] == 'drop':
self.database.prepareBulkImport() self.database.prepareBulkImport()
else: else:
print "No need to drop indexes." print "No need to drop indexes."
#print "dropInd =", self.settings['dropIndexes'], " dropHudCache =", self.settings['dropHudCache'] #print "dropInd =", self.settings['dropIndexes'], " dropHudCache =", self.settings['dropHudCache']
if self.settings['threads'] <= 0:
(totstored, totdups, totpartial, toterrors) = self.importFiles(self.database, None)
else:
# create queue (will probably change to deque at some point):
self.writeq = Queue.Queue( self.settings['writeQSize'] )
# start separate thread(s) to read hands from queue and write to db:
for i in xrange(self.settings['threads']):
t = threading.Thread( target=self.writerdbs[i].insert_queue_hands
, args=(self.writeq, self.settings["writeQMaxWait"])
, name="dbwriter-"+str(i) )
t.setDaemon(True)
t.start()
# read hands and write to q:
(totstored, totdups, totpartial, toterrors) = self.importFiles(self.database, self.writeq)
if self.writeq.empty():
print "writers finished already"
pass
else:
print "waiting for writers to finish ..."
#for t in threading.enumerate():
# print " "+str(t)
#self.writeq.join()
#using empty() might be more reliable:
while not self.writeq.empty() and len(threading.enumerate()) > 1:
sleep(0.5)
print " ... writers finished"
# Tidying up after import
if self.settings['dropIndexes'] == 'drop':
self.database.afterBulkImport()
else:
print "No need to rebuild indexes."
if self.settings['allowHudcacheRebuild'] and self.settings['dropHudCache'] == 'drop':
self.database.rebuild_hudcache()
else:
print "No need to rebuild hudcache."
self.database.analyzeDB()
endtime = time()
return (totstored, totdups, totpartial, toterrors, endtime-starttime)
# end def runImport
def importFiles(self, db, q):
""""Read filenames in self.filelist and pass to import_file_dict().
Uses a separate database connection if created as a thread (caller
passes None or no param as db)."""
totstored = 0 totstored = 0
totdups = 0 totdups = 0
totpartial = 0 totpartial = 0
toterrors = 0 toterrors = 0
tottime = 0 tottime = 0
# if threads <= 1: do this bit
for file in self.filelist: for file in self.filelist:
(stored, duplicates, partial, errors, ttime) = self.import_file_dict(file, self.filelist[file][0], self.filelist[file][1]) (stored, duplicates, partial, errors, ttime) = self.import_file_dict(db, file
,self.filelist[file][0], self.filelist[file][1], q)
totstored += stored totstored += stored
totdups += duplicates totdups += duplicates
totpartial += partial totpartial += partial
toterrors += errors toterrors += errors
tottime += ttime
if self.settings['dropIndexes'] == 'drop':
self.database.afterBulkImport()
else:
print "No need to rebuild indexes."
if self.allow_hudcache_rebuild and self.settings['dropHudCache'] == 'drop':
self.database.rebuild_hudcache()
else:
print "No need to rebuild hudcache."
self.database.analyzeDB()
return (totstored, totdups, totpartial, toterrors, tottime)
# else: import threaded
def calculate_auto(self): for i in xrange( self.settings['threads'] ):
print "sending finish msg qlen =", q.qsize()
db.send_finish_msg(q)
return (totstored, totdups, totpartial, toterrors)
# end def importFiles
# not used currently
def calculate_auto(self, db):
"""An heuristic to determine a reasonable value of drop/don't drop""" """An heuristic to determine a reasonable value of drop/don't drop"""
if len(self.filelist) == 1: return "don't drop" if len(self.filelist) == 1: return "don't drop"
if 'handsInDB' not in self.settings: if 'handsInDB' not in self.settings:
try: try:
tmpcursor = self.database.get_cursor() tmpcursor = db.get_cursor()
tmpcursor.execute("Select count(1) from Hands;") tmpcursor.execute("Select count(1) from Hands;")
self.settings['handsInDB'] = tmpcursor.fetchone()[0] self.settings['handsInDB'] = tmpcursor.fetchone()[0]
except: except:
pass # if this fails we're probably doomed anyway pass # if this fails we're probably doomed anyway
if self.settings['handsInDB'] < 5000: return "drop" if self.settings['handsInDB'] < 5000: return "drop"
if len(self.filelist) < 50: return "don't drop" if len(self.filelist) < 50: return "don't drop"
if self.settings['handsInDB'] > 50000: return "don't drop" if self.settings['handsInDB'] > 50000: return "don't drop"
return "drop" return "drop"
def calculate_auto2(self, scale, increment): def calculate_auto2(self, db, scale, increment):
"""A second heuristic to determine a reasonable value of drop/don't drop """A second heuristic to determine a reasonable value of drop/don't drop
This one adds up size of files to import to guess number of hands in them This one adds up size of files to import to guess number of hands in them
Example values of scale and increment params might be 10 and 500 meaning Example values of scale and increment params might be 10 and 500 meaning
@ -231,7 +299,7 @@ class Importer:
# get number of hands in db # get number of hands in db
if 'handsInDB' not in self.settings: if 'handsInDB' not in self.settings:
try: try:
tmpcursor = self.database.get_cursor() tmpcursor = db.get_cursor()
tmpcursor.execute("Select count(1) from Hands;") tmpcursor.execute("Select count(1) from Hands;")
self.settings['handsInDB'] = tmpcursor.fetchone()[0] self.settings['handsInDB'] = tmpcursor.fetchone()[0]
except: except:
@ -244,7 +312,7 @@ class Importer:
stat_info = os.stat(file) stat_info = os.stat(file)
total_size += stat_info.st_size total_size += stat_info.st_size
# if hands_in_db is zero or very low, we want to drop indexes, otherwise compare # if hands_in_db is zero or very low, we want to drop indexes, otherwise compare
# import size with db size somehow: # import size with db size somehow:
ret = "don't drop" ret = "don't drop"
if self.settings['handsInDB'] < scale * (total_size/size_per_hand) + increment: if self.settings['handsInDB'] < scale * (total_size/size_per_hand) + increment:
@ -253,7 +321,7 @@ class Importer:
# size_per_hand, "inc =", increment, "return:", ret # size_per_hand, "inc =", increment, "return:", ret
return ret return ret
#Run import on updated files, then store latest update time. #Run import on updated files, then store latest update time. Called from GuiAutoImport.py
def runUpdated(self): def runUpdated(self):
#Check for new files in monitored directories #Check for new files in monitored directories
#todo: make efficient - always checks for new file, should be able to use mtime of directory #todo: make efficient - always checks for new file, should be able to use mtime of directory
@ -268,11 +336,11 @@ class Importer:
if os.path.exists(file): if os.path.exists(file):
stat_info = os.stat(file) stat_info = os.stat(file)
#rulog.writelines("path exists ") #rulog.writelines("path exists ")
try: try:
lastupdate = self.updated[file] lastupdate = self.updated[file]
#rulog.writelines("lastupdate = %d, mtime = %d" % (lastupdate,stat_info.st_mtime)) #rulog.writelines("lastupdate = %d, mtime = %d" % (lastupdate,stat_info.st_mtime))
if stat_info.st_mtime > lastupdate: if stat_info.st_mtime > lastupdate:
self.import_file_dict(file, self.filelist[file][0], self.filelist[file][1]) self.import_file_dict(self.database, file, self.filelist[file][0], self.filelist[file][1])
self.updated[file] = time() self.updated[file] = time()
except: except:
self.updated[file] = time() self.updated[file] = time()
@ -281,10 +349,10 @@ class Importer:
if os.path.isdir(file) or (time() - stat_info.st_mtime) < 60: if os.path.isdir(file) or (time() - stat_info.st_mtime) < 60:
# TODO attach a HHC thread to the file # TODO attach a HHC thread to the file
# TODO import the output of the HHC thread -- this needs to wait for the HHC to block? # TODO import the output of the HHC thread -- this needs to wait for the HHC to block?
self.import_file_dict(file, self.filelist[file][0], self.filelist[file][1]) self.import_file_dict(self.database, file, self.filelist[file][0], self.filelist[file][1], None)
# TODO we also test if directory, why? # TODO we also test if directory, why?
#if os.path.isdir(file): #if os.path.isdir(file):
#self.import_file_dict(file, self.filelist[file][0], self.filelist[file][1]) #self.import_file_dict(self.database, file, self.filelist[file][0], self.filelist[file][1])
else: else:
self.removeFromFileList[file] = True self.removeFromFileList[file] = True
self.addToDirList = filter(lambda x: self.addImportDirectory(x, True, self.addToDirList[x][0], self.addToDirList[x][1]), self.addToDirList) self.addToDirList = filter(lambda x: self.addImportDirectory(x, True, self.addToDirList[x][0], self.addToDirList[x][1]), self.addToDirList)
@ -292,7 +360,7 @@ class Importer:
for file in self.removeFromFileList: for file in self.removeFromFileList:
if file in self.filelist: if file in self.filelist:
del self.filelist[file] del self.filelist[file]
self.addToDirList = {} self.addToDirList = {}
self.removeFromFileList = {} self.removeFromFileList = {}
self.database.rollback() self.database.rollback()
@ -300,13 +368,15 @@ class Importer:
#rulog.close() #rulog.close()
# This is now an internal function that should not be called directly. # This is now an internal function that should not be called directly.
def import_file_dict(self, file, site, filter): def import_file_dict(self, db, file, site, filter, q=None):
#print "import_file_dict" #print "import_file_dict"
if os.path.isdir(file): if os.path.isdir(file):
self.addToDirList[file] = [site] + [filter] self.addToDirList[file] = [site] + [filter]
return return
conv = None conv = None
(stored, duplicates, partial, errors, ttime) = (0, 0, 0, 0, 0)
# Load filter, process file, pass returned filename to import_fpdb_file # Load filter, process file, pass returned filename to import_fpdb_file
print "\nConverting %s" % file print "\nConverting %s" % file
@ -325,7 +395,7 @@ class Importer:
if callable(obj): if callable(obj):
conv = obj(in_path = file, out_path = out_path, index = 0) # Index into file 0 until changeover conv = obj(in_path = file, out_path = out_path, index = 0) # Index into file 0 until changeover
if(conv.getStatus() and self.NEWIMPORT == False): if(conv.getStatus() and self.NEWIMPORT == False):
(stored, duplicates, partial, errors, ttime) = self.import_fpdb_file(out_path, site) (stored, duplicates, partial, errors, ttime) = self.import_fpdb_file(db, out_path, site, q)
elif (conv.getStatus() and self.NEWIMPORT == True): elif (conv.getStatus() and self.NEWIMPORT == True):
#This code doesn't do anything yet #This code doesn't do anything yet
handlist = hhc.getProcessedHands() handlist = hhc.getProcessedHands()
@ -346,11 +416,11 @@ class Importer:
return (stored, duplicates, partial, errors, ttime) return (stored, duplicates, partial, errors, ttime)
def import_fpdb_file(self, file, site): def import_fpdb_file(self, db, file, site, q):
#print "import_fpdb_file"
starttime = time() starttime = time()
last_read_hand = 0 last_read_hand = 0
loc = 0 loc = 0
(stored, duplicates, partial, errors, ttime) = (0, 0, 0, 0, 0)
#print "file =", file #print "file =", file
if file == "stdin": if file == "stdin":
inputFile = sys.stdin inputFile = sys.stdin
@ -377,13 +447,39 @@ class Importer:
self.pos_in_file[file] = inputFile.tell() self.pos_in_file[file] = inputFile.tell()
inputFile.close() inputFile.close()
#self.database.lock_for_insert() # should be ok when using one thread (stored, duplicates, partial, errors, ttime, handsId) = self.import_fpdb_lines(db, self.lines, starttime, file, site, q)
db.commit()
ttime = time() - starttime
if q == None:
print "\rTotal stored:", stored, " duplicates:", duplicates, "errors:", errors, " time:", ttime
if not stored:
if duplicates:
for line_no in xrange(len(self.lines)):
if self.lines[line_no].find("Game #")!=-1:
final_game_line=self.lines[line_no]
handsId=fpdb_simple.parseSiteHandNo(final_game_line)
else:
print "failed to read a single hand from file:", inputFile
handsId=0
#todo: this will cause return of an unstored hand number if the last hand was error
self.handsId=handsId
return (stored, duplicates, partial, errors, ttime)
# end def import_fpdb_file
def import_fpdb_lines(self, db, lines, starttime, file, site, q = None):
"""Import an fpdb hand history held in the list lines, could be one hand or many"""
#db.lock_for_insert() # should be ok when using one thread, but doesn't help??
try: # sometimes we seem to be getting an empty self.lines, in which case, we just want to return. try: # sometimes we seem to be getting an empty self.lines, in which case, we just want to return.
firstline = self.lines[0] firstline = lines[0]
except: except:
# just skip the debug message and return silently: # just skip the debug message and return silently:
#print "DEBUG: import_fpdb_file: failed on self.lines[0]: '%s' '%s' '%s' '%s' " %( file, site, self.lines, loc) #print "DEBUG: import_fpdb_file: failed on lines[0]: '%s' '%s' '%s' '%s' " %( file, site, lines, loc)
return (0,0,0,1,0) return (0,0,0,1,0)
if firstline.find("Tournament Summary")!=-1: if firstline.find("Tournament Summary")!=-1:
@ -399,16 +495,18 @@ class Importer:
duplicates = 0 #counter duplicates = 0 #counter
partial = 0 #counter partial = 0 #counter
errors = 0 #counter errors = 0 #counter
ttime = None
handsId = -1
for i in xrange (len(self.lines)): for i in xrange (len(lines)):
if (len(self.lines[i])<2): #Wierd way to detect for '\r\n' or '\n' if (len(lines[i])<2): #Wierd way to detect for '\r\n' or '\n'
endpos=i endpos=i
hand=self.lines[startpos:endpos] hand=lines[startpos:endpos]
if (len(hand[0])<2): if (len(hand[0])<2):
hand=hand[1:] hand=hand[1:]
if (len(hand)<3): if (len(hand)<3):
pass pass
#TODO: This is ugly - we didn't actually find the start of the #TODO: This is ugly - we didn't actually find the start of the
@ -420,10 +518,10 @@ class Importer:
self.hand=hand self.hand=hand
try: try:
handsId = fpdb_parse_logic.mainParser( self.settings handsId = fpdb_parse_logic.mainParser( self.settings, self.siteIds[site]
, self.siteIds[site], category, hand , category, hand, self.config
, self.config, self.database ) , db, q )
self.database.commit() db.commit()
stored += 1 stored += 1
if self.callHud: if self.callHud:
@ -433,29 +531,29 @@ class Importer:
self.caller.pipe_to_hud.stdin.write("%s" % (handsId) + os.linesep) self.caller.pipe_to_hud.stdin.write("%s" % (handsId) + os.linesep)
except fpdb_simple.DuplicateError: except fpdb_simple.DuplicateError:
duplicates += 1 duplicates += 1
self.database.rollback() db.rollback()
except (ValueError), fe: except (ValueError), fe:
errors += 1 errors += 1
self.printEmailErrorMessage(errors, file, hand) self.printEmailErrorMessage(errors, file, hand)
if (self.settings['failOnError']): if (self.settings['failOnError']):
self.database.commit() #dont remove this, in case hand processing was cancelled. db.commit() #dont remove this, in case hand processing was cancelled.
raise raise
else: else:
self.database.rollback() db.rollback()
except (fpdb_simple.FpdbError), fe: except (fpdb_simple.FpdbError), fe:
errors += 1 errors += 1
self.printEmailErrorMessage(errors, file, hand) self.printEmailErrorMessage(errors, file, hand)
self.database.rollback() db.rollback()
if self.settings['failOnError']: if self.settings['failOnError']:
self.database.commit() #dont remove this, in case hand processing was cancelled. db.commit() #dont remove this, in case hand processing was cancelled.
raise raise
if self.settings['minPrint']: if self.settings['minPrint']:
if not ((stored+duplicates+errors) % self.settings['minPrint']): if not ((stored+duplicates+errors) % self.settings['minPrint']):
print "stored:", stored, "duplicates:", duplicates, "errors:", errors print "stored:", stored, " duplicates:", duplicates, "errors:", errors
if self.settings['handCount']: if self.settings['handCount']:
if ((stored+duplicates+errors) >= self.settings['handCount']): if ((stored+duplicates+errors) >= self.settings['handCount']):
if not self.settings['quiet']: if not self.settings['quiet']:
@ -463,22 +561,8 @@ class Importer:
print "Total stored:", stored, "duplicates:", duplicates, "errors:", errors, " time:", (time() - starttime) print "Total stored:", stored, "duplicates:", duplicates, "errors:", errors, " time:", (time() - starttime)
sys.exit(0) sys.exit(0)
startpos = endpos startpos = endpos
ttime = time() - starttime return (stored, duplicates, partial, errors, ttime, handsId)
print "\rTotal stored:", stored, "duplicates:", duplicates, "errors:", errors, " time:", ttime # end def import_fpdb_lines
if not stored:
if duplicates:
for line_no in xrange(len(self.lines)):
if self.lines[line_no].find("Game #")!=-1:
final_game_line=self.lines[line_no]
handsId=fpdb_simple.parseSiteHandNo(final_game_line)
else:
print "failed to read a single hand from file:", inputFile
handsId=0
#todo: this will cause return of an unstored hand number if the last hand was error
self.database.commit()
self.handsId=handsId
return (stored, duplicates, partial, errors, ttime)
def printEmailErrorMessage(self, errors, filename, line): def printEmailErrorMessage(self, errors, filename, line):
traceback.print_exc(file=sys.stderr) traceback.print_exc(file=sys.stderr)

View File

@ -25,13 +25,12 @@ from time import time, strftime
#parses a holdem hand #parses a holdem hand
def mainParser(settings, siteID, category, hand, config, db = None): def mainParser(settings, siteID, category, hand, config, db = None, writeq = None):
#print "mainparser"
# fdb is not used now - to be removed ...
t0 = time() t0 = time()
#print "mainparser" #print "mainparser"
backend = settings['db-backend'] backend = settings['db-backend']
# Ideally db connection is passed in, if not use sql list if passed in, otherwise start from scratch
if db == None: if db == None:
db = Database.Database(c = config, sql = None) db = Database.Database(c = config, sql = None)
category = fpdb_simple.recogniseCategory(hand[0]) category = fpdb_simple.recogniseCategory(hand[0])
@ -80,7 +79,6 @@ def mainParser(settings, siteID, category, hand, config, db = None):
rebuyOrAddon = -1 rebuyOrAddon = -1
tourneyTypeId = 1 tourneyTypeId = 1
fpdb_simple.isAlreadyInDB(db.get_cursor(), gametypeID, siteHandNo) fpdb_simple.isAlreadyInDB(db.get_cursor(), gametypeID, siteHandNo)
hand = fpdb_simple.filterCrap(hand, isTourney) hand = fpdb_simple.filterCrap(hand, isTourney)
@ -177,7 +175,13 @@ def mainParser(settings, siteID, category, hand, config, db = None):
, positions, antes, cardValues, cardSuits, boardValues, boardSuits , positions, antes, cardValues, cardSuits, boardValues, boardSuits
, winnings, rakes, actionTypes, allIns, actionAmounts , winnings, rakes, actionTypes, allIns, actionAmounts
, actionNos, hudImportData, maxSeats, tableName, seatNos) , actionNos, hudImportData, maxSeats, tableName, seatNos)
result = db.store_the_hand(htw)
# save hand in db via direct call or via q if in a thread
if writeq == None:
result = db.store_the_hand(htw)
else:
writeq.put(htw)
result = -999 # meaning unknown
t9 = time() t9 = time()
#print "parse and save=(%4.3f)" % (t9-t0) #print "parse and save=(%4.3f)" % (t9-t0)