(Comments)

You may refactor these as you wish to use with your favourite db and db connector. If you are sick of sending a bunch of param names and then the leading data filled with question marks, this may be for you.

import re
import logging
from oursql import ProgrammingError

def convert_list_to_utf8(data_list):
    """Normalize list to utf-8.
    :param data_list: list object
    """
    new_list = []
    for item in data_list:
        new_list.append(convert_text_to_encoding(item, 'utf-8'))
    return new_list

def run_sql_query(db_conn, sql, params, find_one=False):
    """
    Wraps oursql query for easy execution and cursor handling.
    Returns the data from the cursor without need to worry about
    opening and closing cursors. get_sql_results
    :param find_one:
    :param sql: the sql query parsed with key word parameters.
    :param params: the paramter key and values (dict).
    :param db_conn: the connection to the db, either read or write.
    :return: list of results or none
    """

    # Perform cleaning of the sql and params list into parameterized format
    if isinstance(params, dict):
        clean_sql, clean_params = get_clean_sql_and_params(sql, params)
    else:
        clean_sql, clean_params = sql, params
    logging.debug("Run SQL got sql: {} and params: {}".format(clean_sql, clean_params))
    with db_conn.cursor() as cursor:
        logging.debug("QUERY: {}".format(clean_sql.replace('\n', '')))
        logging.debug("PARAMS: {} {}".format(clean_params, type(clean_params)))
        cursor.execute(clean_sql, clean_params)
        try:
            if find_one:
                result = cursor.fetchone()
            else:
                result = cursor.fetchall()
        except ProgrammingError:  # 'no results available' for insert & delete.
            # lastrowid is None on DELETE or INSERT on table without PK.
            result = cursor.lastrowid
    return result


def run_sql_transaction(db_write_conn, sql_params_list):
    """
    Can be used to wrap multiple insert executions to perform an sql transaction

    parameterized sql and params
    :param db_write_conn:
    :param sql_params_list: list of sql and params i.e.
    (('sql1',['p1','p2']), ('sql2',[])) where sql* is a parameterized sql
    statement and p* are its parameters.
    :return:
    """

    # Initialize last row id return values, query index
    last_row_ids, i = [], 0

    # Setup the write cursor
    cursor = db_write_conn.cursor()
    try:
        for i, sql_and_params in enumerate(sql_params_list):
            sql = sql_and_params[0]
            params = sql_and_params[1]
            # Perform cleaning of the sql and params list into parameterized format
            if isinstance(params, dict):  # params list, tuple will skip cleaning.
                sql, params = get_clean_sql_and_params(sql, params)
            cursor.execute(sql, params)
            last_row_ids.append(cursor.lastrowid)
    except:
        db_write_conn.rollback()
        logging.critical("SQL Failed to execute param & sql: {}".format(sql_params_list[i]))
        raise
    else:
        db_write_conn.commit()
    finally:
        cursor.close()

    return last_row_ids


def get_clean_sql_and_params(sql, params):
    """
    Gets teh clean sql and parameters.
    :param sql: the sql query parsed with key word parameters.
    :param params: the paramter key and values (dict).
    :return: clean sql string,  clean list of values (tuple).
    """
    # Gets params from sql in correct order.
    key_regex = re.compile(r'\{(.+?)\}')
    keys = re.findall(key_regex, sql)

    # Safe way to replace each and report on those not replaced.
    clean_sql = sql
    values = []
    for key in keys:
        clean_sql = re.sub(r'\{' + key + '\}', r'?', clean_sql)
        values.append(params[key])

    excess_params = re.findall(key_regex, clean_sql)
    if excess_params:
        raise ValueError("Parameter(s) found and not replaced in string: {}".format(excess_params))

    logging.debug("Got clean sql: {} and param values: {}".format(' '.join(clean_sql.split()), values))
    return clean_sql, tuple(convert_list_to_utf8(values))


def generate_sql_insert(table_name, params):
    """
    Will generate VALUES string for a given params dict (locals) and the table name
    :return:
    """

    # Will not work with NOW() for now, but most basic cases ok.
    p1 = "INSERT INTO {}".format(table_name)

    p2 = p3 = ""
    values = []
    for key, value in params.items():
        p2 += "{},".format(key)
        p3 += "?,"
        values.append(value)
    p2 = p2.strip(',')
    p3 = p3.strip(',')

    sql = p1 + "(" + p2 + ") VALUES (" + p3 + ");"
    return sql, tuple(values)


def get_mysql_now(db_conn):
    """Returns the db server timestamp
    :param db_conn:
    """
    with db_conn.cursor() as cursor:
        cursor.execute("SELECT NOW() as now")
        result = cursor.fetchone().get('now')
    return result

Now that you have this all defined, pass in your db read connection and you can use it like so:

function select_something(db_conn):
    name = 'Rad'
    company = 'Radtek'
    params = locals()

    sql = """SELECT * FROM my_table WHERE name={name} AND company={company};"""

    result = db_utils.run_sql_query(db_conn, sql, params)

And this can handle utf-8 conversion! And it may be a little better than doing something like this every time:

function select_something(db_conn):
    name = 'Rad'
    company = 'Radtek'
    

    sql = """SELECT * FROM my_table WHERE name = ? AND company = ?;"""

    try:
        with db_conn.cursor() as cursor:
            cursor.execute(sql_status, (name, company))
            result = cursor.fetchall()
    except Exception as e:
        logging.error(str(e))

Currently unrated

Comments