Posted by: Radek Wojcik 7 years, 1 month ago
(Comments)
You may refactor these as you wish to use with your favourite db and db connector. If you are sick of sending a bunch of param names and then the leading data filled with question marks, this may be for you.
import re import logging from oursql import ProgrammingError def convert_list_to_utf8(data_list): """Normalize list to utf-8. :param data_list: list object """ new_list = [] for item in data_list: new_list.append(convert_text_to_encoding(item, 'utf-8')) return new_list def run_sql_query(db_conn, sql, params, find_one=False): """ Wraps oursql query for easy execution and cursor handling. Returns the data from the cursor without need to worry about opening and closing cursors. get_sql_results :param find_one: :param sql: the sql query parsed with key word parameters. :param params: the paramter key and values (dict). :param db_conn: the connection to the db, either read or write. :return: list of results or none """ # Perform cleaning of the sql and params list into parameterized format if isinstance(params, dict): clean_sql, clean_params = get_clean_sql_and_params(sql, params) else: clean_sql, clean_params = sql, params logging.debug("Run SQL got sql: {} and params: {}".format(clean_sql, clean_params)) with db_conn.cursor() as cursor: logging.debug("QUERY: {}".format(clean_sql.replace('\n', ''))) logging.debug("PARAMS: {} {}".format(clean_params, type(clean_params))) cursor.execute(clean_sql, clean_params) try: if find_one: result = cursor.fetchone() else: result = cursor.fetchall() except ProgrammingError: # 'no results available' for insert & delete. # lastrowid is None on DELETE or INSERT on table without PK. result = cursor.lastrowid return result def run_sql_transaction(db_write_conn, sql_params_list): """ Can be used to wrap multiple insert executions to perform an sql transaction parameterized sql and params :param db_write_conn: :param sql_params_list: list of sql and params i.e. (('sql1',['p1','p2']), ('sql2',[])) where sql* is a parameterized sql statement and p* are its parameters. :return: """ # Initialize last row id return values, query index last_row_ids, i = [], 0 # Setup the write cursor cursor = db_write_conn.cursor() try: for i, sql_and_params in enumerate(sql_params_list): sql = sql_and_params[0] params = sql_and_params[1] # Perform cleaning of the sql and params list into parameterized format if isinstance(params, dict): # params list, tuple will skip cleaning. sql, params = get_clean_sql_and_params(sql, params) cursor.execute(sql, params) last_row_ids.append(cursor.lastrowid) except: db_write_conn.rollback() logging.critical("SQL Failed to execute param & sql: {}".format(sql_params_list[i])) raise else: db_write_conn.commit() finally: cursor.close() return last_row_ids def get_clean_sql_and_params(sql, params): """ Gets teh clean sql and parameters. :param sql: the sql query parsed with key word parameters. :param params: the paramter key and values (dict). :return: clean sql string, clean list of values (tuple). """ # Gets params from sql in correct order. key_regex = re.compile(r'\{(.+?)\}') keys = re.findall(key_regex, sql) # Safe way to replace each and report on those not replaced. clean_sql = sql values = [] for key in keys: clean_sql = re.sub(r'\{' + key + '\}', r'?', clean_sql) values.append(params[key]) excess_params = re.findall(key_regex, clean_sql) if excess_params: raise ValueError("Parameter(s) found and not replaced in string: {}".format(excess_params)) logging.debug("Got clean sql: {} and param values: {}".format(' '.join(clean_sql.split()), values)) return clean_sql, tuple(convert_list_to_utf8(values)) def generate_sql_insert(table_name, params): """ Will generate VALUES string for a given params dict (locals) and the table name :return: """ # Will not work with NOW() for now, but most basic cases ok. p1 = "INSERT INTO {}".format(table_name) p2 = p3 = "" values = [] for key, value in params.items(): p2 += "{},".format(key) p3 += "?," values.append(value) p2 = p2.strip(',') p3 = p3.strip(',') sql = p1 + "(" + p2 + ") VALUES (" + p3 + ");" return sql, tuple(values) def get_mysql_now(db_conn): """Returns the db server timestamp :param db_conn: """ with db_conn.cursor() as cursor: cursor.execute("SELECT NOW() as now") result = cursor.fetchone().get('now') return result
Now that you have this all defined, pass in your db read connection and you can use it like so:
function select_something(db_conn): name = 'Rad' company = 'Radtek' params = locals() sql = """SELECT * FROM my_table WHERE name={name} AND company={company};""" result = db_utils.run_sql_query(db_conn, sql, params)
And this can handle utf-8 conversion! And it may be a little better than doing something like this every time:
function select_something(db_conn): name = 'Rad' company = 'Radtek' sql = """SELECT * FROM my_table WHERE name = ? AND company = ?;""" try: with db_conn.cursor() as cursor: cursor.execute(sql_status, (name, company)) result = cursor.fetchall() except Exception as e: logging.error(str(e))Share on Twitter Share on Facebook
Comments