diff --git a/pyfpdb/Database.py b/pyfpdb/Database.py index 8d58da50..e72c2882 100755 --- a/pyfpdb/Database.py +++ b/pyfpdb/Database.py @@ -31,6 +31,7 @@ from time import time, strftime import string import re import logging +import Queue # pyGTK modules @@ -469,23 +470,19 @@ class Database: ,action_types, allIns, action_amounts, actionNos, hudImportData, maxSeats, tableName ,seatNos): - try: - fpdb_simple.fillCardArrays(len(names), base, category, card_values, card_suits) + fpdb_simple.fillCardArrays(len(names), base, category, card_values, card_suits) - hands_id = self.storeHands(self.backend, site_hand_no, gametype_id - ,hand_start_time, names, tableName, maxSeats, hudImportData - ,(None, None, None, None, None), (None, None, None, None, None)) + hands_id = self.storeHands(self.backend, site_hand_no, gametype_id + ,hand_start_time, names, tableName, maxSeats, hudImportData + ,(None, None, None, None, None), (None, None, None, None, None)) - #print "before calling store_hands_players_stud, antes:", antes - hands_players_ids = self.store_hands_players_stud(self.backend, hands_id, player_ids - ,start_cashes, antes, card_values - ,card_suits, winnings, rakes, seatNos) + #print "before calling store_hands_players_stud, antes:", antes + hands_players_ids = self.store_hands_players_stud(self.backend, hands_id, player_ids + ,start_cashes, antes, card_values + ,card_suits, winnings, rakes, seatNos) - if 'dropHudCache' not in settings or settings['dropHudCache'] != 'drop': - self.storeHudCache(self.backend, base, category, gametype_id, hand_start_time, player_ids, hudImportData) - except: - print "ring_stud error: " + str(sys.exc_value) # in case exception doesn't get printed - raise fpdb_simple.FpdbError("ring_stud error: " + str(sys.exc_value)) + if 'dropHudCache' not in settings or settings['dropHudCache'] != 'drop': + self.storeHudCache(self.backend, base, category, gametype_id, hand_start_time, player_ids, hudImportData) return hands_id #end def ring_stud @@ -496,30 +493,26 @@ class Database: ,action_amounts, actionNos, hudImportData, maxSeats, tableName, seatNos): """stores a holdem/omaha hand into the database""" - try: - t0 = time() - #print "in ring_holdem_omaha" - fpdb_simple.fillCardArrays(len(names), base, category, card_values, card_suits) - t1 = time() - fpdb_simple.fill_board_cards(board_values, board_suits) - t2 = time() + t0 = time() + #print "in ring_holdem_omaha" + fpdb_simple.fillCardArrays(len(names), base, category, card_values, card_suits) + t1 = time() + fpdb_simple.fill_board_cards(board_values, board_suits) + t2 = time() - hands_id = self.storeHands(self.backend, site_hand_no, gametype_id - ,hand_start_time, names, tableName, maxSeats - ,hudImportData, board_values, board_suits) - #TEMPORARY CALL! - Just until all functions are migrated - t3 = time() - hands_players_ids = self.store_hands_players_holdem_omaha( - self.backend, category, hands_id, player_ids, start_cashes - , positions, card_values, card_suits, winnings, rakes, seatNos, hudImportData) - t4 = time() - if 'dropHudCache' not in settings or settings['dropHudCache'] != 'drop': - self.storeHudCache(self.backend, base, category, gametype_id, hand_start_time, player_ids, hudImportData) - t5 = time() - #print "fills=(%4.3f) saves=(%4.3f,%4.3f,%4.3f)" % (t2-t0, t3-t2, t4-t3, t5-t4) - except: - print "ring_holdem_omaha error: " + str(sys.exc_value) # in case exception doesn't get printed - raise fpdb_simple.FpdbError("ring_holdem_omaha error: " + str(sys.exc_value)) + hands_id = self.storeHands(self.backend, site_hand_no, gametype_id + ,hand_start_time, names, tableName, maxSeats + ,hudImportData, board_values, board_suits) + #TEMPORARY CALL! - Just until all functions are migrated + t3 = time() + hands_players_ids = self.store_hands_players_holdem_omaha( + self.backend, category, hands_id, player_ids, start_cashes + , positions, card_values, card_suits, winnings, rakes, seatNos, hudImportData) + t4 = time() + if 'dropHudCache' not in settings or settings['dropHudCache'] != 'drop': + self.storeHudCache(self.backend, base, category, gametype_id, hand_start_time, player_ids, hudImportData) + t5 = time() + #print "fills=(%4.3f) saves=(%4.3f,%4.3f,%4.3f)" % (t2-t0, t3-t2, t4-t3, t5-t4) return hands_id #end def ring_holdem_omaha @@ -532,28 +525,24 @@ class Database: ,actionNos, hudImportData, maxSeats, tableName, seatNos): """stores a tourney holdem/omaha hand into the database""" - try: - fpdb_simple.fillCardArrays(len(names), base, category, card_values, card_suits) - fpdb_simple.fill_board_cards(board_values, board_suits) + fpdb_simple.fillCardArrays(len(names), base, category, card_values, card_suits) + fpdb_simple.fill_board_cards(board_values, board_suits) - tourney_id = self.store_tourneys(tourneyTypeId, siteTourneyNo, entries, prizepool, tourney_start) - tourneys_players_ids = self.store_tourneys_players(tourney_id, player_ids, payin_amounts, ranks, winnings) + tourney_id = self.store_tourneys(tourneyTypeId, siteTourneyNo, entries, prizepool, tourney_start) + tourneys_players_ids = self.store_tourneys_players(tourney_id, player_ids, payin_amounts, ranks, winnings) - hands_id = self.storeHands(self.backend, site_hand_no, gametype_id - ,hand_start_time, names, tableName, maxSeats - ,hudImportData, board_values, board_suits) + hands_id = self.storeHands(self.backend, site_hand_no, gametype_id + ,hand_start_time, names, tableName, maxSeats + ,hudImportData, board_values, board_suits) - hands_players_ids = self.store_hands_players_holdem_omaha_tourney( - self.backend, category, hands_id, player_ids, start_cashes, positions - , card_values, card_suits, winnings, rakes, seatNos, tourneys_players_ids - , hudImportData) + hands_players_ids = self.store_hands_players_holdem_omaha_tourney( + self.backend, category, hands_id, player_ids, start_cashes, positions + , card_values, card_suits, winnings, rakes, seatNos, tourneys_players_ids + , hudImportData) - #print "tourney holdem, backend=%d" % backend - if 'dropHudCache' not in settings or settings['dropHudCache'] != 'drop': - self.storeHudCache(self.backend, base, category, gametype_id, hand_start_time, player_ids, hudImportData) - except: - print "tourney_holdem_omaha error: " + str(sys.exc_value) # in case exception doesn't get printed - raise fpdb_simple.FpdbError("tourney_holdem_omaha error: " + str(sys.exc_value)) + #print "tourney holdem, backend=%d" % backend + if 'dropHudCache' not in settings or settings['dropHudCache'] != 'drop': + self.storeHudCache(self.backend, base, category, gametype_id, hand_start_time, player_ids, hudImportData) return hands_id #end def tourney_holdem_omaha @@ -565,26 +554,22 @@ class Database: ,actionNos, hudImportData, maxSeats, tableName, seatNos): #stores a tourney stud/razz hand into the database - try: - fpdb_simple.fillCardArrays(len(names), base, category, cardValues, cardSuits) + fpdb_simple.fillCardArrays(len(names), base, category, cardValues, cardSuits) - tourney_id = self.store_tourneys(tourneyTypeId, siteTourneyNo, entries, prizepool, tourneyStartTime) + tourney_id = self.store_tourneys(tourneyTypeId, siteTourneyNo, entries, prizepool, tourneyStartTime) - tourneys_players_ids = self.store_tourneys_players(tourney_id, playerIds, payin_amounts, ranks, winnings) + tourneys_players_ids = self.store_tourneys_players(tourney_id, playerIds, payin_amounts, ranks, winnings) - hands_id = self.storeHands( self.backend, siteHandNo, gametypeId - , handStartTime, names, tableName, maxSeats - , hudImportData, board_values, board_suits ) + hands_id = self.storeHands( self.backend, siteHandNo, gametypeId + , handStartTime, names, tableName, maxSeats + , hudImportData, board_values, board_suits ) - hands_players_ids = self.store_hands_players_stud_tourney(self.backend, hands_id - , playerIds, startCashes, antes, cardValues, cardSuits - , winnings, rakes, seatNos, tourneys_players_ids) + hands_players_ids = self.store_hands_players_stud_tourney(self.backend, hands_id + , playerIds, startCashes, antes, cardValues, cardSuits + , winnings, rakes, seatNos, tourneys_players_ids) - if 'dropHudCache' not in settings or settings['dropHudCache'] != 'drop': - self.storeHudCache(self.backend, base, category, gametypeId, hand_start_time, playerIds, hudImportData) - except: - print "tourney_stud error: " + str(sys.exc_value) # in case exception doesn't get printed - raise fpdb_simple.FpdbError("tourney_stud error: " + str(sys.exc_value)) + if 'dropHudCache' not in settings or settings['dropHudCache'] != 'drop': + self.storeHudCache(self.backend, base, category, gametypeId, hand_start_time, playerIds, hudImportData) return hands_id #end def tourney_stud @@ -613,7 +598,7 @@ class Database: "AND referenced_column_name = %s ", (fk['fktab'], fk['fkcol'], fk['rtab'], fk['rcol']) ) cons = c.fetchone() - print "preparebulk find fk: cons=", cons + #print "preparebulk find fk: cons=", cons if cons: print "dropping mysql fk", cons[0], fk['fktab'], fk['fkcol'] try: @@ -994,7 +979,7 @@ class Database: result = self.tourney_holdem_omaha( h.config, h.settings, h.base, h.category, h.siteTourneyNo, h.buyin , h.fee, h.knockout, h.entries, h.prizepool, h.tourneyStartTime - , h.payin_amounts, h.ranks, h.tourneyTypeId, h.siteID, h.siteHandNo + , payin_amounts, ranks, h.tourneyTypeId, h.siteID, h.siteHandNo , h.gametypeID, h.handStartTime, h.names, h.playerIDs, h.startCashes , h.positions, h.cardValues, h.cardSuits, h.boardValues, h.boardSuits , h.winnings, h.rakes, h.actionTypes, h.allIns, h.actionAmounts @@ -1003,13 +988,13 @@ class Database: result = self.tourney_stud( h.config, h.settings, h.base, h.category, h.siteTourneyNo , h.buyin, h.fee, h.knockout, h.entries, h.prizepool, h.tourneyStartTime - , h.payin_amounts, h.ranks, h.tourneyTypeId, h.siteID, h.siteHandNo + , payin_amounts, ranks, h.tourneyTypeId, h.siteID, h.siteHandNo , h.gametypeID, h.handStartTime, h.names, h.playerIDs, h.startCashes , h.antes, h.cardValues, h.cardSuits, h.winnings, h.rakes, h.actionTypes , h.allIns, h.actionAmounts, h.actionNos, h.hudImportData, h.maxSeats , h.tableName, h.seatNos) else: - raise fpself.simple.Fpself.rror("unrecognised category") + raise fpdb_simple.FpdbError("unrecognised category") else: if h.base == "hold": result = self.ring_holdem_omaha( @@ -1027,11 +1012,13 @@ class Database: , h.actionAmounts, h.actionNos, h.hudImportData, h.maxSeats, h.tableName , h.seatNos) else: - raise fpself.simple.Fpself.rror ("unrecognised category") - self.commit() + raise fpdb_simple.FpdbError("unrecognised category") except: print "Error storing hand: " + str(sys.exc_value) self.rollback() + # re-raise the exception so that the calling routine can decide what to do: + # (e.g. a write thread might try again) + raise return result #end def store_the_hand @@ -1576,8 +1563,85 @@ class Database: #end def store_tourneys_players + # read HandToWrite objects from q and insert into database + def insert_queue_hands(self, q, maxwait=10, commitEachHand=True): + n,fails,maxTries,firstWait = 0,0,4,0.1 + sendFinal = False + t0 = time() + while True: + try: + h = q.get(True) # (True,maxWait) has probs if 1st part of import is all dups + except Queue.Empty: + # Queue.Empty exception thrown if q was empty for + # if q.empty() also possible - no point if testing for Queue.Empty exception + # maybe increment a counter and only break after a few times? + # could also test threading.active_count() or look through threading.enumerate() + # so break immediately if no threads, but count up to X exceptions if a writer + # thread is still alive??? + print "queue empty too long - writer stopping ..." + break + except: + print "writer stopping, error reading queue: " + str(sys.exc_info()) + break + #print "got hand", str(h.get_finished()) + + tries,wait,again = 0,firstWait,True + while again: + try: + again = False # set this immediately to avoid infinite loops! + if h.get_finished(): + # all items on queue processed + sendFinal = True + else: + self.store_the_hand(h) + # optional commit, could be every hand / every N hands / every time a + # commit message received?? mark flag to indicate if commits outstanding + if commitEachHand: + self.commit() + n = n + 1 + except: + #print "iqh store error", sys.exc_value # debug + self.rollback() + if re.search('deadlock', str(sys.exc_info()[1]), re.I): + # deadlocks only a problem if hudcache is being updated + tries = tries + 1 + if tries < maxTries and wait < 5: # wait < 5 just to make sure + print "deadlock detected - trying again ..." + time.sleep(wait) + wait = wait + wait + again = True + else: + print "too many deadlocks - failed to store hand " + h.get_siteHandNo() + if not again: + fails = fails + 1 + err = traceback.extract_tb(sys.exc_info()[2])[-1] + print "***Error storing hand: "+err[2]+"("+str(err[1])+"): "+str(sys.exc_info()[1]) + # finished trying to store hand + + # always reduce q count, whether or not this hand was saved ok + q.task_done() + # while True loop + + self.commit() + if sendFinal: + q.task_done() + print "db writer finished: stored %d hands (%d fails) in %.1f seconds" % (n, fails, time()-t0) + # end def insert_queue_hands(): + + + def send_finish_msg(self, q): + try: + h = HandToWrite(True) + q.put(h) + except: + err = traceback.extract_tb(sys.exc_info()[2])[-1] + print "***Error sending finish: "+err[2]+"("+str(err[1])+"): "+str(sys.exc_info()[1]) + # end def send_finish_msg(): + + # Class used to hold all the data needed to write a hand to the db # mainParser() in fpdb_parse_logic.py creates one of these and then passes it to +# self.insert_queue_hands() class HandToWrite: @@ -1675,49 +1739,6 @@ class HandToWrite: raise # end def set_hand - def set_ring_holdem_omaha( self, config, settings, base, category, siteHandNo - , gametypeID, handStartTime, names, playerIDs - , startCashes, positions, cardValues, cardSuits - , boardValues, boardSuits, winnings, rakes - , actionTypes, allIns, actionAmounts, actionNos - , hudImportData, maxSeats, tableName, seatNos ): - self.config = config - self.settings = settings - self.base = base - self.category = category - self.siteHandNo = siteHandNo - self.gametypeID = gametypeID - self.handStartTime = handStartTime - self.names = names - self.playerIDs = playerIDs - self.startCashes = startCashes - self.positions = positions - self.cardValues = cardValues - self.cardSuits = cardSuits - self.boardValues = boardValues - self.boardSuits = boardSuits - self.winnings = winnings - self.rakes = rakes - self.actionTypes = actionTypes - self.allIns = allIns - self.actionAmounts = actionAmounts - self.actionNos = actionNos - self.hudImportData = hudImportData - self.maxSeats = maxSeats - self.tableName = tableName - self.seatNos = seatNos - # end def set_ring_holdem_omaha - - def send_ring_holdem_omaha(self, db): - result = db.ring_holdem_omaha( - self.config, self.settings, self.base, self.category, self.siteHandNo - , self.gametypeID, self.handStartTime, self.names, self.playerIDs - , self.startCashes, self.positions, self.cardValues, self.cardSuits - , self.boardValues, self.boardSuits, self.winnings, self.rakes - , self.actionTypes, self.allIns, self.actionAmounts, self.actionNos - , self.hudImportData, self.maxSeats, self.tableName, self.seatNos) - # end def send_ring_holdem_omaha - def get_finished(self): return( self.finished ) # end def get_finished diff --git a/pyfpdb/fpdb_import.py b/pyfpdb/fpdb_import.py index ed87731b..8ad617f2 100644 --- a/pyfpdb/fpdb_import.py +++ b/pyfpdb/fpdb_import.py @@ -21,12 +21,15 @@ import os # todo: remove this once import_dir is in fpdb_import import sys -from time import time, strftime +from time import time, strftime, sleep import logging import traceback import math import datetime import re +import Queue +from collections import deque # using Queue for now +import threading # fpdb/FreePokerTools modules @@ -42,11 +45,11 @@ try: mysqlLibFound=True except: pass - + try: import psycopg2 pgsqlLibFound=True - import psycopg2.extensions + import psycopg2.extensions psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) except: @@ -61,7 +64,6 @@ class Importer: self.config = config self.sql = sql - self.database = None # database will be the main db interface eventually self.filelist = {} self.dirlist = {} self.siteIds = {} @@ -74,14 +76,24 @@ class Importer: self.pos_in_file = {} # dict to remember how far we have read in the file #Set defaults self.callHud = self.config.get_import_parameters().get("callFpdbHud") - + + # CONFIGURATION OPTIONS - update allowHudcacheRebuild and forceThreads for faster imports self.settings.setdefault("minPrint", 30) self.settings.setdefault("handCount", 0) - - self.database = Database.Database(self.config, sql = self.sql) # includes .connection and .sql variables + self.settings.setdefault("allowHudcacheRebuild", False) # if True speeds up big imports a lot, also + # stops deadlock problems with threaded imports + self.settings.setdefault("forceThreads", 0) # set to 1/2/more for faster imports + self.settings.setdefault("writeQSize", 1000) # no need to change + self.settings.setdefault("writeQMaxWait", 10) # not used + + self.writeq = None + self.database = Database.Database(self.config, sql = self.sql) + self.writerdbs = [] + self.settings.setdefault("threads", 1) # value overridden by GuiBulkImport - use forceThreads above + for i in xrange(self.settings['threads']): + self.writerdbs.append( Database.Database(self.config, sql = self.sql) ) self.NEWIMPORT = False - self.allow_hudcache_rebuild = False #Set functions def setCallHud(self, value): @@ -104,6 +116,9 @@ class Importer: def setThreads(self, value): self.settings['threads'] = value + if self.settings["threads"] > len(self.writerdbs): + for i in xrange(self.settings['threads'] - len(self.writerdbs)): + self.writerdbs.append( Database.Database(self.config, sql = self.sql) ) def setDropIndexes(self, value): self.settings['dropIndexes'] = value @@ -114,6 +129,11 @@ class Importer: def clearFileList(self): self.filelist = {} + def closeDBs(self): + self.database.disconnect() + for i in xrange(len(self.writerdbs)): + self.writerdbs[i].disconnect() + #Add an individual file to filelist def addImportFile(self, filename, site = "default", filter = "passthrough"): #TODO: test it is a valid file -> put that in config!! @@ -164,61 +184,109 @@ class Importer: print "Warning: Attempted to add non-directory: '" + str(dir) + "' as an import directory" def runImport(self): - """"Run full import on self.filelist.""" + """"Run full import on self.filelist. This is called from GuiBulkImport.py""" + if self.settings['forceThreads'] > 0: # use forceThreads until threading enabled in GuiBulkImport + self.setThreads(self.settings['forceThreads']) + # Initial setup start = datetime.datetime.now() + starttime = time() print "Started at", start, "--", len(self.filelist), "files to import.", self.settings['dropIndexes'] if self.settings['dropIndexes'] == 'auto': - self.settings['dropIndexes'] = self.calculate_auto2(12.0, 500.0) - if self.allow_hudcache_rebuild: - self.settings['dropHudCache'] = self.calculate_auto2(25.0, 500.0) # returns "drop"/"don't drop" + self.settings['dropIndexes'] = self.calculate_auto2(self.database, 12.0, 500.0) + if self.settings['allowHudcacheRebuild']: + self.settings['dropHudCache'] = self.calculate_auto2(self.database, 25.0, 500.0) # returns "drop"/"don't drop" if self.settings['dropIndexes'] == 'drop': self.database.prepareBulkImport() else: print "No need to drop indexes." #print "dropInd =", self.settings['dropIndexes'], " dropHudCache =", self.settings['dropHudCache'] + + if self.settings['threads'] <= 0: + (totstored, totdups, totpartial, toterrors) = self.importFiles(self.database, None) + else: + # create queue (will probably change to deque at some point): + self.writeq = Queue.Queue( self.settings['writeQSize'] ) + # start separate thread(s) to read hands from queue and write to db: + for i in xrange(self.settings['threads']): + t = threading.Thread( target=self.writerdbs[i].insert_queue_hands + , args=(self.writeq, self.settings["writeQMaxWait"]) + , name="dbwriter-"+str(i) ) + t.setDaemon(True) + t.start() + # read hands and write to q: + (totstored, totdups, totpartial, toterrors) = self.importFiles(self.database, self.writeq) + + if self.writeq.empty(): + print "writers finished already" + pass + else: + print "waiting for writers to finish ..." + #for t in threading.enumerate(): + # print " "+str(t) + #self.writeq.join() + #using empty() might be more reliable: + while not self.writeq.empty() and len(threading.enumerate()) > 1: + sleep(0.5) + print " ... writers finished" + + # Tidying up after import + if self.settings['dropIndexes'] == 'drop': + self.database.afterBulkImport() + else: + print "No need to rebuild indexes." + if self.settings['allowHudcacheRebuild'] and self.settings['dropHudCache'] == 'drop': + self.database.rebuild_hudcache() + else: + print "No need to rebuild hudcache." + self.database.analyzeDB() + endtime = time() + return (totstored, totdups, totpartial, toterrors, endtime-starttime) + # end def runImport + + def importFiles(self, db, q): + """"Read filenames in self.filelist and pass to import_file_dict(). + Uses a separate database connection if created as a thread (caller + passes None or no param as db).""" + totstored = 0 totdups = 0 totpartial = 0 toterrors = 0 tottime = 0 -# if threads <= 1: do this bit for file in self.filelist: - (stored, duplicates, partial, errors, ttime) = self.import_file_dict(file, self.filelist[file][0], self.filelist[file][1]) + (stored, duplicates, partial, errors, ttime) = self.import_file_dict(db, file + ,self.filelist[file][0], self.filelist[file][1], q) totstored += stored totdups += duplicates totpartial += partial toterrors += errors - tottime += ttime - if self.settings['dropIndexes'] == 'drop': - self.database.afterBulkImport() - else: - print "No need to rebuild indexes." - if self.allow_hudcache_rebuild and self.settings['dropHudCache'] == 'drop': - self.database.rebuild_hudcache() - else: - print "No need to rebuild hudcache." - self.database.analyzeDB() - return (totstored, totdups, totpartial, toterrors, tottime) -# else: import threaded - def calculate_auto(self): + for i in xrange( self.settings['threads'] ): + print "sending finish msg qlen =", q.qsize() + db.send_finish_msg(q) + + return (totstored, totdups, totpartial, toterrors) + # end def importFiles + + # not used currently + def calculate_auto(self, db): """An heuristic to determine a reasonable value of drop/don't drop""" if len(self.filelist) == 1: return "don't drop" if 'handsInDB' not in self.settings: try: - tmpcursor = self.database.get_cursor() + tmpcursor = db.get_cursor() tmpcursor.execute("Select count(1) from Hands;") self.settings['handsInDB'] = tmpcursor.fetchone()[0] except: pass # if this fails we're probably doomed anyway if self.settings['handsInDB'] < 5000: return "drop" - if len(self.filelist) < 50: return "don't drop" + if len(self.filelist) < 50: return "don't drop" if self.settings['handsInDB'] > 50000: return "don't drop" return "drop" - def calculate_auto2(self, scale, increment): + def calculate_auto2(self, db, scale, increment): """A second heuristic to determine a reasonable value of drop/don't drop This one adds up size of files to import to guess number of hands in them Example values of scale and increment params might be 10 and 500 meaning @@ -231,7 +299,7 @@ class Importer: # get number of hands in db if 'handsInDB' not in self.settings: try: - tmpcursor = self.database.get_cursor() + tmpcursor = db.get_cursor() tmpcursor.execute("Select count(1) from Hands;") self.settings['handsInDB'] = tmpcursor.fetchone()[0] except: @@ -244,7 +312,7 @@ class Importer: stat_info = os.stat(file) total_size += stat_info.st_size - # if hands_in_db is zero or very low, we want to drop indexes, otherwise compare + # if hands_in_db is zero or very low, we want to drop indexes, otherwise compare # import size with db size somehow: ret = "don't drop" if self.settings['handsInDB'] < scale * (total_size/size_per_hand) + increment: @@ -253,7 +321,7 @@ class Importer: # size_per_hand, "inc =", increment, "return:", ret return ret - #Run import on updated files, then store latest update time. + #Run import on updated files, then store latest update time. Called from GuiAutoImport.py def runUpdated(self): #Check for new files in monitored directories #todo: make efficient - always checks for new file, should be able to use mtime of directory @@ -268,11 +336,11 @@ class Importer: if os.path.exists(file): stat_info = os.stat(file) #rulog.writelines("path exists ") - try: + try: lastupdate = self.updated[file] #rulog.writelines("lastupdate = %d, mtime = %d" % (lastupdate,stat_info.st_mtime)) if stat_info.st_mtime > lastupdate: - self.import_file_dict(file, self.filelist[file][0], self.filelist[file][1]) + self.import_file_dict(self.database, file, self.filelist[file][0], self.filelist[file][1]) self.updated[file] = time() except: self.updated[file] = time() @@ -281,10 +349,10 @@ class Importer: if os.path.isdir(file) or (time() - stat_info.st_mtime) < 60: # TODO attach a HHC thread to the file # TODO import the output of the HHC thread -- this needs to wait for the HHC to block? - self.import_file_dict(file, self.filelist[file][0], self.filelist[file][1]) + self.import_file_dict(self.database, file, self.filelist[file][0], self.filelist[file][1], None) # TODO we also test if directory, why? #if os.path.isdir(file): - #self.import_file_dict(file, self.filelist[file][0], self.filelist[file][1]) + #self.import_file_dict(self.database, file, self.filelist[file][0], self.filelist[file][1]) else: self.removeFromFileList[file] = True self.addToDirList = filter(lambda x: self.addImportDirectory(x, True, self.addToDirList[x][0], self.addToDirList[x][1]), self.addToDirList) @@ -292,7 +360,7 @@ class Importer: for file in self.removeFromFileList: if file in self.filelist: del self.filelist[file] - + self.addToDirList = {} self.removeFromFileList = {} self.database.rollback() @@ -300,13 +368,15 @@ class Importer: #rulog.close() # This is now an internal function that should not be called directly. - def import_file_dict(self, file, site, filter): + def import_file_dict(self, db, file, site, filter, q=None): #print "import_file_dict" if os.path.isdir(file): self.addToDirList[file] = [site] + [filter] return conv = None + (stored, duplicates, partial, errors, ttime) = (0, 0, 0, 0, 0) + # Load filter, process file, pass returned filename to import_fpdb_file print "\nConverting %s" % file @@ -325,7 +395,7 @@ class Importer: if callable(obj): conv = obj(in_path = file, out_path = out_path, index = 0) # Index into file 0 until changeover if(conv.getStatus() and self.NEWIMPORT == False): - (stored, duplicates, partial, errors, ttime) = self.import_fpdb_file(out_path, site) + (stored, duplicates, partial, errors, ttime) = self.import_fpdb_file(db, out_path, site, q) elif (conv.getStatus() and self.NEWIMPORT == True): #This code doesn't do anything yet handlist = hhc.getProcessedHands() @@ -346,11 +416,11 @@ class Importer: return (stored, duplicates, partial, errors, ttime) - def import_fpdb_file(self, file, site): - #print "import_fpdb_file" + def import_fpdb_file(self, db, file, site, q): starttime = time() last_read_hand = 0 loc = 0 + (stored, duplicates, partial, errors, ttime) = (0, 0, 0, 0, 0) #print "file =", file if file == "stdin": inputFile = sys.stdin @@ -377,13 +447,39 @@ class Importer: self.pos_in_file[file] = inputFile.tell() inputFile.close() - #self.database.lock_for_insert() # should be ok when using one thread + (stored, duplicates, partial, errors, ttime, handsId) = self.import_fpdb_lines(db, self.lines, starttime, file, site, q) + + db.commit() + ttime = time() - starttime + if q == None: + print "\rTotal stored:", stored, " duplicates:", duplicates, "errors:", errors, " time:", ttime + + if not stored: + if duplicates: + for line_no in xrange(len(self.lines)): + if self.lines[line_no].find("Game #")!=-1: + final_game_line=self.lines[line_no] + handsId=fpdb_simple.parseSiteHandNo(final_game_line) + else: + print "failed to read a single hand from file:", inputFile + handsId=0 + #todo: this will cause return of an unstored hand number if the last hand was error + self.handsId=handsId + + return (stored, duplicates, partial, errors, ttime) + # end def import_fpdb_file + + + def import_fpdb_lines(self, db, lines, starttime, file, site, q = None): + """Import an fpdb hand history held in the list lines, could be one hand or many""" + + #db.lock_for_insert() # should be ok when using one thread, but doesn't help?? try: # sometimes we seem to be getting an empty self.lines, in which case, we just want to return. - firstline = self.lines[0] + firstline = lines[0] except: # just skip the debug message and return silently: - #print "DEBUG: import_fpdb_file: failed on self.lines[0]: '%s' '%s' '%s' '%s' " %( file, site, self.lines, loc) + #print "DEBUG: import_fpdb_file: failed on lines[0]: '%s' '%s' '%s' '%s' " %( file, site, lines, loc) return (0,0,0,1,0) if firstline.find("Tournament Summary")!=-1: @@ -399,16 +495,18 @@ class Importer: duplicates = 0 #counter partial = 0 #counter errors = 0 #counter + ttime = None + handsId = -1 - for i in xrange (len(self.lines)): - if (len(self.lines[i])<2): #Wierd way to detect for '\r\n' or '\n' + for i in xrange (len(lines)): + if (len(lines[i])<2): #Wierd way to detect for '\r\n' or '\n' endpos=i - hand=self.lines[startpos:endpos] - + hand=lines[startpos:endpos] + if (len(hand[0])<2): hand=hand[1:] - + if (len(hand)<3): pass #TODO: This is ugly - we didn't actually find the start of the @@ -420,10 +518,10 @@ class Importer: self.hand=hand try: - handsId = fpdb_parse_logic.mainParser( self.settings - , self.siteIds[site], category, hand - , self.config, self.database ) - self.database.commit() + handsId = fpdb_parse_logic.mainParser( self.settings, self.siteIds[site] + , category, hand, self.config + , db, q ) + db.commit() stored += 1 if self.callHud: @@ -433,29 +531,29 @@ class Importer: self.caller.pipe_to_hud.stdin.write("%s" % (handsId) + os.linesep) except fpdb_simple.DuplicateError: duplicates += 1 - self.database.rollback() + db.rollback() except (ValueError), fe: errors += 1 self.printEmailErrorMessage(errors, file, hand) if (self.settings['failOnError']): - self.database.commit() #dont remove this, in case hand processing was cancelled. + db.commit() #dont remove this, in case hand processing was cancelled. raise else: - self.database.rollback() + db.rollback() except (fpdb_simple.FpdbError), fe: errors += 1 self.printEmailErrorMessage(errors, file, hand) - self.database.rollback() + db.rollback() if self.settings['failOnError']: - self.database.commit() #dont remove this, in case hand processing was cancelled. + db.commit() #dont remove this, in case hand processing was cancelled. raise if self.settings['minPrint']: if not ((stored+duplicates+errors) % self.settings['minPrint']): - print "stored:", stored, "duplicates:", duplicates, "errors:", errors - + print "stored:", stored, " duplicates:", duplicates, "errors:", errors + if self.settings['handCount']: if ((stored+duplicates+errors) >= self.settings['handCount']): if not self.settings['quiet']: @@ -463,22 +561,8 @@ class Importer: print "Total stored:", stored, "duplicates:", duplicates, "errors:", errors, " time:", (time() - starttime) sys.exit(0) startpos = endpos - ttime = time() - starttime - print "\rTotal stored:", stored, "duplicates:", duplicates, "errors:", errors, " time:", ttime - - if not stored: - if duplicates: - for line_no in xrange(len(self.lines)): - if self.lines[line_no].find("Game #")!=-1: - final_game_line=self.lines[line_no] - handsId=fpdb_simple.parseSiteHandNo(final_game_line) - else: - print "failed to read a single hand from file:", inputFile - handsId=0 - #todo: this will cause return of an unstored hand number if the last hand was error - self.database.commit() - self.handsId=handsId - return (stored, duplicates, partial, errors, ttime) + return (stored, duplicates, partial, errors, ttime, handsId) + # end def import_fpdb_lines def printEmailErrorMessage(self, errors, filename, line): traceback.print_exc(file=sys.stderr) diff --git a/pyfpdb/fpdb_parse_logic.py b/pyfpdb/fpdb_parse_logic.py index fd2d6796..de21439b 100644 --- a/pyfpdb/fpdb_parse_logic.py +++ b/pyfpdb/fpdb_parse_logic.py @@ -25,13 +25,12 @@ from time import time, strftime #parses a holdem hand -def mainParser(settings, siteID, category, hand, config, db = None): - #print "mainparser" - # fdb is not used now - to be removed ... +def mainParser(settings, siteID, category, hand, config, db = None, writeq = None): t0 = time() #print "mainparser" backend = settings['db-backend'] + # Ideally db connection is passed in, if not use sql list if passed in, otherwise start from scratch if db == None: db = Database.Database(c = config, sql = None) category = fpdb_simple.recogniseCategory(hand[0]) @@ -80,7 +79,6 @@ def mainParser(settings, siteID, category, hand, config, db = None): rebuyOrAddon = -1 tourneyTypeId = 1 - fpdb_simple.isAlreadyInDB(db.get_cursor(), gametypeID, siteHandNo) hand = fpdb_simple.filterCrap(hand, isTourney) @@ -177,7 +175,13 @@ def mainParser(settings, siteID, category, hand, config, db = None): , positions, antes, cardValues, cardSuits, boardValues, boardSuits , winnings, rakes, actionTypes, allIns, actionAmounts , actionNos, hudImportData, maxSeats, tableName, seatNos) - result = db.store_the_hand(htw) + + # save hand in db via direct call or via q if in a thread + if writeq == None: + result = db.store_the_hand(htw) + else: + writeq.put(htw) + result = -999 # meaning unknown t9 = time() #print "parse and save=(%4.3f)" % (t9-t0)