Psycopg2 在类中自动重新连接

我有课程可以连接到我的数据库。


import psycopg2, psycopg2.extensions

from parseini import config

import pandas as pd, pandas.io.sql as sqlio



class MyDatabase:

    def __init__(self, name='mydb.ini'):

        self.params = config(filename=name)

        self.my_connection = psycopg2.connect(**self.params)

        self.my_cursor = self.my_connection.cursor()


    def fetch_all_as_df(self, sql_statement):

        return sqlio.read_sql_query(sql_statement, self.my_connection)


    def df_to_sql(self, df):

        table = 'sometable'

        return sqlio.to_sql(df, table, self.my_connection)


    def __del__(self):

        self.my_cursor.close()

        self.my_connection.close()

在我的案例中,如何重新连接到数据库并处理 psycopg2.OperationalError?


函数式编程
浏览 192回答 1
1回答

饮歌长啸

psycopg2.InterfaceError您可以制作一个装饰器,在或psycopg2.OperationalError被提升时尝试重新连接。这只是它如何工作的一个例子,可能需要调整:import timefrom functools import wrapsimport psycopg2, psycopg2.extensionsdef retry(fn):    @wraps(fn)    def wrapper(*args, **kw):        cls = args[0]        for x in range(cls._reconnectTries):            print(x, cls._reconnectTries)            try:                return fn(*args, **kw)            except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:                print ("\nDatabase Connection [InterfaceError or OperationalError]")                print ("Idle for %s seconds" % (cls._reconnectIdle))                time.sleep(cls._reconnectIdle)                cls._connect()    return wrapperclass MyDatabase:    _reconnectTries = 5    _reconnectIdle = 2  # wait seconds before retying    def __init__(self, name='mydb.ini'):        self.my_connection = None        self.my_cursor = None        self.params = config(filename=name)        self._connect()    def _connect(self):        self.my_connection = psycopg2.connect(**self.params)        self.my_cursor = self.my_connection.cursor()    @retry    def fetch_all_as_df(self, sql_statement):        return sqlio.read_sql_query(sql_statement, self.my_connection)    @retry    def dummy(self):        self.my_cursor.execute('select 1+2 as result')        return self.my_cursor.fetchone()    @retry    def df_to_sql(self, df):        table = 'sometable'        return sqlio.to_sql(df, table, self.my_connection)    def __del__(self):        # Maybe there is a connection but no cursor, whatever close silently!        for c in (self.my_cursor, self.my_connection):            try:                c.close()            except:                passdb = MyDatabase()time.sleep(30)  # some time to shutdown the databaseprint(db.dummy())输出:Database Connection [InterfaceError or OperationalError]Idle for 2 secondsDatabase Connection [InterfaceError or OperationalError]Idle for 2 secondsDatabase Connection [InterfaceError or OperationalError]Idle for 2 secondsDatabase Connection [InterfaceError or OperationalError]Idle for 2 seconds(3,)注意:_connect它本身没有修饰,所以这段代码假定初始连接总是有效!
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python