/*****************************************************************************
 * Copyright (C) 2000 Jim Kent.  This source code may be freely used         *
 * for personal, academic, and non-profit purposes.  Commercial use          *
 * permitted only by explicit agreement with Jim Kent (jim_kent@pacbell.net) *
 *****************************************************************************/
/* jksql.c - Stuff to manage interface with SQL database. */

/*
 * Configuration:
 */


#include "common.h"
#include "portable.h"
#include "errAbort.h"
#include <mysql.h>
#include "dlist.h"
#include "dystring.h"
#include "jksql.h"
#include "sqlNum.h"
#include "hgConfig.h"
#include "cheapcgi.h"

/* a function to get mysql results, either mysql_use_result or mysql_store_result */
/* a) mysql_use_result means that after a query, the results are stored on the server and return row-by-row 
 * b) mysql_store_result means that the results are returned together 
 * 
 * a) means less memory, b) longer transfer times (latency of network * number of rows)
 * */

typedef MYSQL_RES *	STDCALL ResGetter(MYSQL *mysql);

#define DEFAULTGETTER mysql_use_result

/* flags controlling sql monitoring facility */
static unsigned monitorInited = FALSE;      /* initialized yet? */
static unsigned monitorFlags = 0;           /* flags indicating what is traced */
static long monitorEnterTime = 0;           /* time current tasked started */
static long long sqlTotalTime = 0;          /* total real milliseconds */
static long sqlTotalQueries = 0;            /* total number of queries */
static boolean monitorHandlerSet = FALSE;   /* is exit handler installed? */
static unsigned traceIndent = 0;            /* how much to indent */
static char *indentStr = "                                                       ";
static boolean sqlParanoid = FALSE;         /* extra squawking */

/* statistics */
static unsigned totalNumConnects = 0;
static unsigned maxNumConnections = 0;

struct sqlProfile
/* a configuration profile for connecting to a server */
{
    struct sqlProfile *next;
    char *name;         // name of profile
    char *host;         // host name for database server
    unsigned int port;  // port for database server
    char *socket;       // unix-domain socket path for database server
    char *user;         // database server user name
    char *password;     // database server password
    char *db;           // database if specified in config
    struct slName *dbs; // database associated with profile, can be NULL.
    // ssl
    char *key;       // path to ssl client key.pem
    char *cert;      // path to ssl client cert.pem
    char *ca;        // path to ssl certificate authority ca.pem
    char *caPath;    // path to directory containing ssl .pem certs (only OpenSSL)
    char *cipher;    // list of permissible ciphers to use
    char *crl;       // path to file containing certificate revocation lists in PEM format
    char *crlPath;   // path to directory containing crl files (only OpenSSL)
    char *verifyServerCert;  // Client will check server cert Subject CN={host}
                             //  Boolean connection flag, if NON-NULL and != "0" then it is on.
    };

struct sqlConnection
/* This is an item on a list of sql open connections. */
    {
    MYSQL *conn;		    /* Connection. Can be NULL if not connected yet. */
    struct sqlProfile *profile;     /* profile, or NULL if not opened via a profile */
    struct dlNode *node;	    /* Pointer to list node. */
    struct dlList *resultList;	    /* Any open results. */
    boolean hasHardLock;	    /* TRUE if table has a non-advisory lock. */
    boolean inCache;                /* debugging flag to indicate it's in a cache */
    boolean isFree;                /* is this connection free for reuse; alway FALSE
                                    * unless managed by a cache */
    int hasTableCache;              /* to avoid repeated checks for cache table name existence, 
                                      -1 if not initialized yet, otherwise like a boolean */
    struct sqlConnection *failoverConn; /* tried if a query fails on the main connection. */
                                    /* Can be NULL. */
    char *db;                       /* to be able to connect later (if conn is NULL), we need to  */
                                    /* store the database */
    };

struct sqlResult
/* This is an item on a list of sql open results. */
    {
    MYSQL_RES *result;			/* Result. */
    struct dlNode *node;		/* Pointer to list node we're on. */
    struct sqlConnection *conn;		/* Pointer to connection. */
    long fetchTime;                     /* cummulative time taken by row fetches for this result */
    };

static struct dlList *sqlOpenConnections = NULL;
static unsigned sqlNumOpenConnections = 0;


char *failoverProfPrefix = "slow-";               // prefix for failover profile of main profile (="slow-db")
static struct hash *profiles = NULL;              // profiles parsed from hg.conf, by name
static struct sqlProfile *defaultProfile = NULL;  // default profile, also in profiles list
static struct hash* dbToProfile = NULL;           // db to sqlProfile

// forward declarations to keep the git diffs clean
static struct sqlResult *sqlUseOrStore(struct sqlConnection *sc,
	char *query, ResGetter *getter, boolean abort);
static boolean sqlConnectIfUnconnected(struct sqlConnection *sc, bool abort);
bool sqlConnMustUseFailover(struct sqlConnection *sc);

static char *envOverride(char *envName, char *defaultVal)
/* look up envName in environment, if it exists and is non-empty, return its
 * value, otherwise return defaultVal */
{
char *val = getenv(envName);
if (isEmpty(val))
    return defaultVal;
else
    return val;
}

char *getDefaultProfileName()
/* Return default profile name, handling initialization if needed */
{
static char *defaultProfileName = NULL;
if (!defaultProfileName)
    defaultProfileName = envOverride("HGDB_PROF", "db"); // name of default profile for main connection
return defaultProfileName;
}

static struct sqlProfile *sqlProfileClone(struct sqlProfile *o)
/* clone profile object (does not include ->dbs) */
{
struct sqlProfile *sp;
AllocVar(sp);
sp->name     = cloneString(o->name);
sp->host     = cloneString(o->host);
sp->port     = o->port;
sp->socket   = cloneString(o->socket);
sp->user     = cloneString(o->user);
sp->password = cloneString(o->password);
sp->db       = cloneString(o->db);
sp->key      = cloneString(o->key);
sp->cert     = cloneString(o->cert);
sp->ca       = cloneString(o->ca);
sp->caPath   = cloneString(o->caPath);
sp->cipher   = cloneString(o->cipher);
sp->crl      = cloneString(o->crl);
sp->crlPath  = cloneString(o->crlPath);
sp->verifyServerCert = cloneString(o->verifyServerCert);
return sp;
}

struct sqlProfile *sqlProfileFromPairs(struct slPair *pairs)
/* create a new profile object (does not include ->dbs) */
{
struct sqlProfile *sp;
AllocVar(sp);
struct slPair *p;
for(p=pairs; p; p=p->next)
    {
    char *value = (char *)p->val;
    if (sameString(p->name,"name"))
	sp->name = cloneString(value);
    if (sameString(p->name,"host"))
	sp->host = cloneString(value);
    if (sameString(p->name,"port"))
	sp->port = atoi(value);
    if (sameString(p->name,"socket"))
	sp->socket = cloneString(value);
    if (sameString(p->name,"user"))
	sp->user = cloneString(value);
    if (sameString(p->name,"password"))
	sp->password = cloneString(value);
    if (sameString(p->name,"db"))
	sp->db = cloneString(value);
    if (sameString(p->name,"key"))
	sp->key = cloneString(value);
    if (sameString(p->name,"cert"))
	sp->cert = cloneString(value);
    if (sameString(p->name,"ca"))
	sp->ca = cloneString(value);
    if (sameString(p->name,"caPath"))
	sp->caPath = cloneString(value);
    if (sameString(p->name,"cipher"))
	sp->cipher = cloneString(value);
    if (sameString(p->name,"crl"))
	sp->crl = cloneString(value);
    if (sameString(p->name,"crlPath"))
	sp->crlPath = cloneString(value);
    if (sameString(p->name,"verifyServerCert"))
	sp->verifyServerCert = cloneString(value);
    }
return sp;
}


static void sqlProfileAssocDb(struct sqlProfile *sp, char *db)
/* associate a db with a profile.  If it is already associated with this
 * profile, don't do anything.*/
{
struct sqlProfile *sp2 = hashFindVal(dbToProfile, db);
if ((sp2 != NULL) && (sp2 != sp))
    errAbort("databases %s already associated with profile %s, trying to associated it with %s",
             db, sp2->name, sp->name);
if (sp2 == NULL)
    {
    hashAdd(dbToProfile, db, sp);
    slSafeAddHead(&sp->dbs, slNameNew(db));
    }
}

static void sqlProfileCreate(struct sqlProfile *sp)
/* create a profile and add to global data structures */
{
hashAdd(profiles, sp->name, sp);
if (sameString(sp->name, getDefaultProfileName()))
    defaultProfile = sp;  // save default
}

static void sqlProfileAddProfIf(char *profileName)
/* check if a config prefix is a profile, and if so, add a
 * sqlProfile object for it if doesn't already exist. */
{
char *host = cfgOption2(profileName, "host");
char *portstr = cfgOption2(profileName, "port");
char *socket = cfgOption2(profileName, "socket");
char *user = cfgOption2(profileName, "user");
char *password = cfgOption2(profileName, "password");
char *db = cfgOption2(profileName, "db");
// ssl
char *key = cfgOption2(profileName, "key");
char *cert = cfgOption2(profileName, "cert");
char *ca = cfgOption2(profileName, "ca");
char *caPath = cfgOption2(profileName, "caPath");
char *cipher = cfgOption2(profileName, "cipher");
char *crl = cfgOption2(profileName, "crl");
char *crlPath = cfgOption2(profileName, "crlPath");
char *verifyServerCert = cfgOption2(profileName, "verifyServerCert");

unsigned int port = 0;

if ((host != NULL) && (user != NULL) && (password != NULL) && (hashLookup(profiles, profileName) == NULL))
    {
    /* for the default profile, allow environment variable override */
    if (sameString(profileName, getDefaultProfileName()))
        {
        host     = envOverride("HGDB_HOST", host);
        portstr  = envOverride("HGDB_PORT", portstr);
        socket   = envOverride("HGDB_SOCKET", socket);
        user     = envOverride("HGDB_USER", user);
        password = envOverride("HGDB_PASSWORD", password);
        db       = envOverride("HGDB_DB", db);
	// ssl
	key      = envOverride("HGDB_KEY", key);
	cert     = envOverride("HGDB_CERT", cert);
	ca       = envOverride("HGDB_CA", ca);
	caPath   = envOverride("HGDB_CAPATH", caPath);
	cipher   = envOverride("HGDB_CIPHER", cipher);
	crl      = envOverride("HGDB_CRL", crl);
	crlPath  = envOverride("HGDB_CRLPATH", crlPath);
	verifyServerCert = envOverride("HGDB_VERIFY_SERVER_CERT", verifyServerCert);
        }

    if (portstr != NULL)
	port = atoi(portstr);

    struct sqlProfile *sp;
    AllocVar(sp);
    sp->name     = cloneString(profileName);
    sp->host     = cloneString(host);
    sp->port     = port;
    sp->socket   = cloneString(socket);
    sp->user     = cloneString(user);
    sp->password = cloneString(password);
    sp->db       = cloneString(db);
    sp->key      = cloneString(key);
    sp->cert     = cloneString(cert);
    sp->ca       = cloneString(ca);
    sp->caPath   = cloneString(caPath);
    sp->cipher   = cloneString(cipher);
    sp->crl      = cloneString(crl);
    sp->crlPath  = cloneString(crlPath);
    sp->verifyServerCert = cloneString(verifyServerCert);
    sqlProfileCreate(sp);
    }
}

static void sqlProfileAddProfs(struct slName *cnames)
/* load the profiles from list of config names */
{
struct slName *cname;
for (cname = cnames; cname != NULL; cname = cname->next)
    {
    char *dot1 = strchr(cname->name, '.'); // first dot in name
    if ((dot1 != NULL) && sameString(dot1, ".host"))
        {
        *dot1 = '\0';
        sqlProfileAddProfIf(cname->name);
        *dot1 = '.';
        }
    }
}

void sqlProfileAddDb(char *profileName, char *db)
/* add a mapping of db to profile.  If database is already associated with
 * this profile, it is ignored.  If it is associated with a different profile,
 * it is an error. */
{
struct sqlProfile *sp = hashFindVal(profiles, profileName);
if (sp == NULL)
    errAbort("can't find profile %s for database %s in hg.conf", profileName, db);
sqlProfileAssocDb(sp, db);
}

static void sqlProfileAddDbs(struct slName *cnames)
/* add mappings of db to profile from ${db}.${profile} entries.
 * would have liked to have automatically added ${profile}.db
 * entries, but backupcentral, etc, would map multiple profiles
 * to a databases, so this is done manually in hdb.c. */
{
struct slName *cname;
for (cname = cnames; cname != NULL; cname = cname->next)
    {
    char *dot1 = strchr(cname->name, '.'); // first dot in name
    if ((dot1 != NULL) && sameString(dot1, ".profile"))
        {
        char *profileName = cfgVal(cname->name);
        *dot1 = '\0';
        sqlProfileAddDb(profileName, cname->name);
        *dot1 = '.';
        }
    }
}

static void sqlProfileLoad(void)
/* load the profiles from config */
{
profiles = hashNew(8);
dbToProfile = hashNew(12);
struct slName *cnames = cfgNames();
sqlProfileAddProfs(cnames);
sqlProfileAddDbs(cnames);
slFreeList(&cnames);
}

static struct sqlProfile* sqlProfileFindByName(char *profileName, char *database)
/* find a profile by name, checking that database matches if found */
{
struct sqlProfile* sp = hashFindVal(profiles, profileName);
if (sp == NULL)
    return NULL;
#if UNUSED // FIXME: this breaks hgHeatMap, enable when logicalDb removed
if ((database != NULL) && (sp->dbs != NULL) && !slNameInList(sp->dbs, database))
    errAbort("attempt to obtain SQL profile %s for database %s, "
             "which is not associate with this database-specific profile",
             profileName, database);
#endif
return sp;
}

static struct sqlProfile* sqlProfileFindByDatabase(char *database)
/* find a profile using database as profile name, return the default if not
 * found */
{
if (!database)
    return defaultProfile;
struct sqlProfile *sp = hashFindVal(dbToProfile, database);
if (sp == NULL)
    sp = defaultProfile;
return sp;
}

static struct sqlProfile* sqlProfileGet(char *profileName, char *database)
/* lookup a profile using the profile resolution algorithm:
 *  - If a profile is specified:
 *     - search hg.conf for the profile, if found:
 *       - if database is specified, then either
 *           - the profile should not specify a database
 *           - the database must match the database in the profile
 *  - If a profile is not specified:
 *     - search hg.conf for a profile with the same name as the database
 *     - if there is no profile named the same as the database, use
 *       the default profile of "db"
 * return NULL if not found.
 */
{
//assert((profileName != NULL) || (database != NULL));
if (profiles == NULL)
    sqlProfileLoad();

if (profileName != NULL)
    return sqlProfileFindByName(profileName, database);
else
    return sqlProfileFindByDatabase(database);
}

static struct sqlProfile* sqlProfileGetFailover(struct sqlProfile* sp, char *database)
/* try to find a failover profile for a profile x or return NULL*/
{
if (sp==NULL || sp->name==NULL)
    return NULL;
char *failoverProfName = catTwoStrings(failoverProfPrefix, sp->name);
struct sqlProfile *failoverProf = sqlProfileGet(failoverProfName, database);
freez(&failoverProfName);
return failoverProf;
}

static struct sqlProfile* sqlProfileMustGet(char *profileName, char *database)
/* lookup a profile using the profile resolution algorithm or die trying */
{
struct sqlProfile* sp = sqlProfileGet(profileName, database);
if (sp == NULL)
    {
    if (profileName == NULL)
        errAbort("can't find mysql connection info for database %s in hg.conf or ~/.hg.conf, should have a default profile named \"db\", so values for at least db.host, "
                "db.user and db.password. See http://genomewiki.ucsc.edu/index.php/Hg.conf", database);
    else if (sameWord(profileName, "backupcentral"))
        errAbort("can't find profile %s in hg.conf. This error most likely indicates that the "
            "Genome Browser could not connect to MySQL/MariaDB. Either the databases server is not running"
            "or the database connection socket indicated in hg.conf is not the one used by your server.", 
            profileName);
    else if (database == NULL)
        errAbort("can't find profile %s in hg.conf", profileName);
    else
        errAbort("can't find profile %s for database %s in hg.conf", profileName, database);
    }
return sp;
}

struct slName* sqlProfileGetNames()
/* Get a list of all profile names. slFreeList result when done */
{
if (profiles == NULL)
    sqlProfileLoad();
struct slName *names = NULL;
struct hashCookie cookie = hashFirst(profiles);
struct hashEl* hel;
while ((hel = hashNext(&cookie)) != NULL)
    slAddHead(&names, slNameNew(hel->name));
return names;
}

static void replaceStr(char **str, char *val)
/* free str and replace with clone new value */
{
freeMem(*str);
*str = cloneString(val);
}

void sqlProfileConfig(struct slPair *pairs)
/* Set configuration for the profile.  This overrides an existing profile in
 * hg.conf or defines a new one.  Results are unpredictable if a connect cache
 * has been established for this profile. */
{
struct sqlProfile *spIn = sqlProfileFromPairs(pairs);
struct sqlProfile *sp = sqlProfileGet(spIn->name, NULL);
if (sp == NULL)
    return sqlProfileCreate(spIn);
replaceStr(&sp->host,     spIn->host);
replaceStr(&sp->socket,   spIn->socket);
sp->port = spIn->port;
replaceStr(&sp->user,     spIn->user);
replaceStr(&sp->password, spIn->password);
replaceStr(&sp->db,       spIn->db);
replaceStr(&sp->key,      spIn->key);
replaceStr(&sp->cert,     spIn->cert);
replaceStr(&sp->ca,       spIn->ca);
replaceStr(&sp->caPath,   spIn->caPath);
replaceStr(&sp->cipher,   spIn->cipher);
replaceStr(&sp->crl,      spIn->crl);
replaceStr(&sp->crlPath,  spIn->crlPath);
replaceStr(&sp->verifyServerCert, spIn->verifyServerCert);
}

void sqlProfileConfigDefault(struct slPair *pairs)
/* Set configuration for the default profile.  This overrides an existing
 * profile in hg.conf or defines a new one.  Results are unpredictable if a
 * connect cache has been established for this profile. */
{
struct slPair *found = slPairFind(pairs, "name");
if (found)
    found->val = cloneString(getDefaultProfileName());
else
    slPairAdd(&pairs, "name", cloneString(getDefaultProfileName()));
sqlProfileConfig(pairs);
}

char *sqlProfileToMyCnf(char *profileName)
/* Read in profile named, 
 * and create a multi-line setting string usable in my.cnf files.  
 * Return Null if profile not found. */
{
struct sqlProfile *sp = sqlProfileGet(profileName, NULL);
if (!sp)
    return NULL;
struct dyString *dy = dyStringNew(256);
if (sp->host)
    dyStringPrintf(dy, "host=%s\n", sp->host);
if (sp->user)
    dyStringPrintf(dy, "user=%s\n", sp->user);
if (sp->password)
    dyStringPrintf(dy, "password=%s\n", sp->password);
if (sp->db)
    dyStringPrintf(dy, "database=%s\n", sp->db);
if (sp->port)
    dyStringPrintf(dy, "port=%d\n", sp->port);
if (sp->socket)
    dyStringPrintf(dy, "socket=%s\n", sp->socket);
if (sp->key)
    dyStringPrintf(dy, "ssl-key=%s\n", sp->key);
if (sp->cert)
    dyStringPrintf(dy, "ssl-cert=%s\n", sp->cert);
if (sp->ca)
    dyStringPrintf(dy, "ssl-ca=%s\n", sp->ca);
if (sp->caPath)
    dyStringPrintf(dy, "ssl-capath=%s\n", sp->caPath);
if (sp->cipher)
    dyStringPrintf(dy, "ssl-cipher=%s\n", sp->cipher);
#if (MYSQL_VERSION_ID >= 50603) // mysql version "5.6.3"
    if (sp->crl)
	dyStringPrintf(dy, "ssl-crl=%s\n", sp->crl);
    if (sp->crlPath)
	dyStringPrintf(dy, "ssl-crlpath=%s\n", sp->crlPath);
#endif
if (sp->verifyServerCert && !sameString(sp->verifyServerCert,"0"))
    dyStringPrintf(dy, "ssl-verify-server-cert\n");
return dyStringCannibalize(&dy);
}


static void monitorInit(void)
/* initialize monitoring on the first call */
{
unsigned flags = 0;
char *val;

/* there is special code in cheap.cgi to pass these from cgiOption to env */

val = getenv("JKSQL_TRACE");
if ((val != NULL) && sameString(val, "on"))
    flags |= JKSQL_TRACE;
val = getenv("JKSQL_PROF");
if ((val != NULL) && sameString(val, "on"))
    flags |= JKSQL_PROF;
if (flags != 0)
    sqlMonitorEnable(flags);

monitorInited = TRUE;
}

static void monitorEnter(void)
/* called at the beginning of a routine that is monitored, initialize if
 * necessary and start timing if enabled */
{
if (!monitorInited)
    monitorInit();
assert(monitorEnterTime == 0);  /* no recursion allowed */
if (monitorFlags)
    {
    monitorEnterTime = clock1000();
    }
}

static long monitorLeave(void)
/* called at the end of a routine that is monitored, updates time count.
 * returns time since enter. */
{
long deltaTime = 0;
if (monitorFlags)
    {
    deltaTime = clock1000() - monitorEnterTime;
    assert(monitorEnterTime > 0);
    if (monitorFlags & JKSQL_PROF)
        sqlTotalTime += deltaTime;
    monitorEnterTime = 0;
    }
return deltaTime;
}

static char *scConnDb(struct sqlConnection *sc)
/* Return sc->db, unless it is NULL -- if NULL, return a string for
 * fprint'd messages. */
{
return (sc->db ? sc->db : "?");
}

static char *scConnProfile(struct sqlConnection *sc)
/* Return sc->profile->name, unless profile is NULL -- if NULL, return a string for
 * fprint'd messages. */
{
return (sc->profile ? sc->profile->name : "<noProfile>");
}

static void monitorPrintInfo(struct sqlConnection *sc, char *name)
/* print a monitor message, with connection id and databases. */
{
long int threadId = 0;
if (sc->conn)
    threadId = sc->conn->thread_id;
fprintf(stderr, "%.*s%s %ld %s\n", traceIndent, indentStr, name,
        threadId, scConnDb(sc));
fflush(stderr);
}

static void monitorPrint(struct sqlConnection *sc, char *name,
                         char *format, ...)
/* print a monitor message, with connection id, databases, and
 * printf style message.*/
{
if (!(monitorFlags & JKSQL_TRACE))
    return;
va_list args;
long int threadId = 0;
if (sc->conn)
    threadId = sc->conn->thread_id;
fprintf(stderr, "%.*s%s %ld %s %s ", traceIndent, indentStr, name,
        threadId, sqlGetHost(sc), scConnDb(sc));
va_start(args, format);
vfprintf(stderr, format, args);
va_end(args);
fputc('\n', stderr);
fflush(stderr);
}

static void monitorPrintTime(void)
/* print total time */
{
/* only print if not explictly disabled */
if (monitorFlags & JKSQL_PROF)
    {
    fprintf(stderr, "%.*sSQL_TOTAL_TIME %0.3fs\n", traceIndent, indentStr,
            ((double)sqlTotalTime)/1000.0);
    fprintf(stderr, "%.*sSQL_TOTAL_QUERIES %ld\n", traceIndent, indentStr,
            sqlTotalQueries);
    fflush(stderr);
    }
}

static void monitorPrintQuery(struct sqlConnection *sc, char *query)
/* print a query, replacing newlines with \n */
{
char *cleaned = replaceChars(query, "\n", "\\n");
monitorPrint(sc, "SQL_QUERY", "%s", cleaned);
freeMem(cleaned);
}

void sqlMonitorEnable(unsigned flags)
/* Enable disable tracing or profiling of SQL queries.
 * If JKSQL_TRACE is specified, then tracing of each SQL query is enabled,
 * along with the timing of the queries.
 * If JKSQL_PROF is specified, then time spent in SQL queries is logged
 * and printed when the program exits or when sqlMonitorDisable is called.
 *
 * These options can also be enabled by setting the JKSQL_TRACE and/or
 * JKSQL_PROF environment variables to "on".  The cheapcgi module will set
 * these environment variables if the corresponding CGI variables are set
 * to "on".  These may also be set in the .hg.conf file.  While this method
 * of setting these parameters is a bit of a hack, it avoids uncessary
 * dependencies.
 */
{
monitorFlags = flags;

if ((monitorFlags & JKSQL_PROF) && !monitorHandlerSet)
    {
    /* only add once */
    atexit(monitorPrintTime);
    monitorHandlerSet = TRUE;
    }

monitorInited = TRUE;
}

void sqlMonitorSetIndent(unsigned indent)
/* set the sql indent level indent to the number of spaces to indent each
 * trace, which can be helpful in making voluminous trace info almost
 * readable. */
{
traceIndent = indent;
}

void sqlMonitorDisable(void)
/* Disable tracing or profiling of SQL queries. */
{
if (monitorFlags & JKSQL_PROF)
    monitorPrintTime();

monitorFlags = 0;
sqlTotalTime = 0;  /* allow reenabling */
sqlTotalQueries = 0;
}

void sqlFreeResult(struct sqlResult **pRes)
/* Free up a result. */
{
struct sqlResult *res = *pRes;
if (res != NULL)
    {
    if (monitorFlags & JKSQL_TRACE)
        monitorPrint(res->conn, "SQL_FETCH", "%0.3fs", ((double) res->fetchTime)/1000.0);
    if (res->result != NULL)
        {
        monitorEnter();
	mysql_free_result(res->result);
        monitorLeave();
        }
    if (res->node != NULL)
	{
	dlRemove(res->node);
	freeMem(res->node);
	}
    freez(pRes);
    }
}

void sqlDisconnect(struct sqlConnection **pSc)
/* Close down connection. */
{
struct sqlConnection *sc = *pSc;
long deltaTime;

if (sc != NULL)
    {
    if (sc->inCache)
        errAbort("sqlDisconnect called on connection associated with a cache");
    assert(!sc->isFree);
    MYSQL *conn = sc->conn;
    struct dlList *resList = sc->resultList;
    struct dlNode *node = sc->node;
    if (resList != NULL)
	{
	struct dlNode *resNode, *resNext;
	for (resNode = resList->head; resNode->next != NULL; resNode = resNext)
	    {
	    struct sqlResult *res = resNode->val;
	    resNext = resNode->next;
	    sqlFreeResult(&res);
	    }
	freeDlList(&resList);
	}
    if (conn != NULL)
	{
	if (sc->hasHardLock)
	    sqlHardUnlockAll(sc);
        if (monitorFlags & JKSQL_TRACE)
            monitorPrintInfo(sc, "SQL_DISCONNECT");
        monitorEnter();
	mysql_close(conn);

	deltaTime = monitorLeave();
	if (monitorFlags & JKSQL_TRACE)
	    monitorPrint(sc, "SQL_TIME", "%0.3fs", ((double)deltaTime)/1000.0);
	}
    if (node != NULL)
	{
	dlRemove(node);
	freeMem(node);
	}
   
    freeMem(sc->db);
    // also close failover connection
    if (sc->failoverConn != NULL)
        sqlDisconnect(&sc->failoverConn);

    freez(pSc);
    sqlNumOpenConnections--;
    }
    
        
}

char* sqlGetDatabase(struct sqlConnection *sc)
/* Get the database associated with an connection. Warning: return may be NULL! */
{
return sc->db;
}

char* sqlGetHost(struct sqlConnection *sc)
/* Get the host associated with a connection or NULL. */
{
if (sc->conn)
    return sc->conn->host;
if (sc->profile->host)
    return sc->profile->host;
return NULL;
}

struct slName *sqlGetAllDatabase(struct sqlConnection *sc)
/* Get a list of all database on the server */
{
char query[32];
sqlSafef(query, sizeof query, "show databases");
struct sqlResult *sr = sqlGetResult(sc, query);
char **row;
struct slName *databases = NULL;
while ((row = sqlNextRow(sr)) != NULL)
    {
    if (!startsWith("mysql", row[0]))  /* Avoid internal databases. */
        slSafeAddHead(&databases, slNameNew(row[0]));
    }
sqlFreeResult(&sr);
return databases;
}


static bool sqlTableExistsOnMain(struct sqlConnection *sc, char *tableName)
/* Return TRUE if the table can be queried using sc's main conn;
 * don't check failoverConn or the table cache (showTableCache in hg.conf). */
{
// if the whole db does not exist on the main server, then the table is certainly not there
if (sqlConnMustUseFailover(sc))
    return FALSE;

char query[1024];
sqlCkIl(tableNameSafe,tableName)
//char tableNameSafe[strlen(tableName)+9+1]; sqlCheckIdentifiersList(tableNameSafe, sizeof tableNameSafe, tableName);
sqlSafef(query, sizeof(query), "SELECT 1 FROM %-s LIMIT 0", tableNameSafe);  
struct sqlResult *sr;

// temporarily remove failover connection, we don't want the failover switch here
struct sqlConnection *failoverConn = sc->failoverConn; 
sc->failoverConn=NULL;
sr = sqlUseOrStore(sc, query, DEFAULTGETTER, FALSE);
sc->failoverConn=failoverConn;

bool ret = FALSE;
if (sr!=NULL)
    {
    monitorPrint(sc, "SQL_TABLE_EXISTS", "%s", tableName);
    ret = TRUE;
    sqlFreeResult(&sr);
    }
else
    monitorPrint(sc, "SQL_TABLE_NOT_EXISTS", "%s", tableName);
return ret;
}


static struct sqlConnection *sqlTableCacheFindConn(struct sqlConnection *conn)
/* Check if table name caching is configured and the cache table is also present 
 * on the server of the connection. Returns the connection or NULL */
{
char *tableListTable = cfgOption("showTableCache");
if (tableListTable == NULL) 
    return NULL;

// to avoid hundreds of repeated table existence checks, we keep the result
// of sqlTableCacheFindConn in the sqlConn object
if (conn->hasTableCache==-1) // -1 => undefined
    {
    conn->hasTableCache = (int) sqlTableExistsOnMain(conn, tableListTable);
    if (conn->failoverConn && !conn->hasTableCache)
        {
        monitorPrint(conn, "SQL_FAILOVER_NO_TABLE_CACHE_FOR_DB", "%s", conn->db);
        return NULL;
        }
    }

if (conn->hasTableCache)
    {
    monitorPrint(conn, "SQL_FOUND_TABLE_CACHE", "%s", tableListTable);
    return conn;
    }
else
    {
    monitorPrint(conn, "SQL_NOT_FOUND_TABLE_CACHE", "%s", tableListTable);
    return NULL;
    }
}

static bool sqlTableCacheTableExists(struct sqlConnection *conn, char *maybeTable)
/* check if table exists in table name cache */
// (see redmine 3780 for some historical background on this caching)
{
char query[1024];
char *tableListTable = cfgVal("showTableCache");
char table[2048];
safecpy(table, sizeof table, maybeTable);
char *dot = strchr(table, '.');
if (dot)
    {
    *dot = 0;
    sqlSafef(query, sizeof(query), "SELECT count(*) FROM %s.%s WHERE tableName='%s'", table, tableListTable, dot+1);
    }
else
    sqlSafef(query, sizeof(query), "SELECT count(*) FROM %s WHERE tableName='%s'", tableListTable, table);
return (sqlQuickNum(conn, query)!=0);
}

static struct slName *sqlTableCacheQuery(struct sqlConnection *conn, char *likeExpr)
/* This function queries the tableCache table. It is used by the sqlTableList 
 * function, so it doe not have to connect to the main sql server just to get a list of table names.
 * Returns all table names from the table name cache as a list. 
 * Can optionally filter with a likeExpr e.g. "LIKE snp%". */
{
char *tableList = cfgVal("showTableCache");
struct slName *list = NULL, *el;
char query[1024];
// mysql SHOW TABLES is sorted alphabetically by default
if (likeExpr==NULL)
    sqlSafef(query, sizeof(query), "SELECT DISTINCT tableName FROM %s ORDER BY tableName", tableList);
else
    sqlSafef(query, sizeof(query), 
        "SELECT DISTINCT tableName FROM %s WHERE tableName LIKE '%s' ORDER BY tableName", tableList, likeExpr);

struct sqlResult *sr = sqlGetResult(conn, query);
char **row;
while ((row = sqlNextRow(sr)) != NULL)
    {
    el = slNameNew(row[0]);
    slAddHead(&list, el);
    }
slReverse(&list);
sqlFreeResult(&sr);
return list;
}

static struct slName *sqlListTablesForConn(struct sqlConnection *conn, char *likeExpr)
/* run SHOW TABLES on connection and return a slName list.  LIKE expression
 * can be NULL or string e.g. "LIKE 'snp%'" */
{
char query[256];
if (likeExpr == NULL)
    sqlSafef(query, sizeof(query), "SHOW TABLES");
else
    sqlSafef(query, sizeof(query), "SHOW TABLES LIKE '%s'", likeExpr);

struct slName *list = NULL, *el;

struct sqlResult *sr;
char **row;
sr = sqlGetResult(conn, query);
while ((row = sqlNextRow(sr)) != NULL)
    {
    el = slNameNew(row[0]);
    slAddHead(&list, el);
    }
slReverse(&list);
sqlFreeResult(&sr);
return list;
}

struct slName *sqlListTablesLike(struct sqlConnection *conn, char *likeExpr)
/* Return list of tables in database associated with conn. Optionally filter list with
 * given LIKE expression that can be NULL or string e.g. "LIKE 'snp%'". */
{
struct slName *list = NULL;

struct sqlConnection *cacheConn = sqlTableCacheFindConn(conn);

if (cacheConn)
    list = sqlTableCacheQuery(cacheConn, likeExpr);
else
    list = sqlListTablesForConn(conn, likeExpr);

if (conn->failoverConn != NULL)
    {
    struct slName *failoverList = sqlListTablesForConn(conn->failoverConn, likeExpr);
    slSortMergeUniq(&list, failoverList, slNameCmp, slNameFree);
    }

return list;
}

struct slName *sqlListTables(struct sqlConnection *sc)
/* Return list of tables in database associated with conn. */
{
return sqlListTablesLike(sc, NULL);
}

struct sqlResult *sqlDescribe(struct sqlConnection *conn, char *table)
/* run the sql DESCRIBE command or get a cached table description and return the sql result */
{
char query[1024], cacheQuery[1024];
struct sqlResult *sr;

struct sqlConnection *cacheConn = sqlTableCacheFindConn(conn);
sqlSafef(query, sizeof(query), "DESCRIBE %s", table);

if (cacheConn)
    {
    char *tableListTable = cfgVal("showTableCache");
    sqlSafef(cacheQuery, sizeof(cacheQuery), "SELECT Field, Type, NullAllowed, isKey, hasDefault, Extra FROM %s WHERE tableName='%s'", \
        tableListTable, table);
    conn = cacheConn;
    // check that entries actually exist in the cached table descriptions, otherwise
    // use the default query
    if (sqlQuickString(conn, cacheQuery) != NULL)
        {
        sr = sqlGetResult(conn, cacheQuery);
        return sr;
        }
    }
sr = sqlGetResult(conn, query);
return sr;
}

struct slName *sqlListFields(struct sqlConnection *conn, char *table)
/* Return list of fields in table. */
{
char **row;
struct slName *list = NULL, *el;
struct sqlResult *sr = NULL;
sr = sqlDescribe(conn, table);
while ((row = sqlNextRow(sr)) != NULL)
    {
    el = slNameNew(row[0]);
    slAddHead(&list, el);
    }
sqlFreeResult(&sr);
slReverse(&list);
return list;
}

void sqlAddDatabaseFields(char *database, struct hash *hash)
/* Add fields from the one database to hash. */
{
struct sqlConnection *conn = sqlConnect(database);
struct slName *table, *tableList = sqlListTables(conn);
struct sqlResult *sr;
char **row;
char fullName[512];
for (table = tableList; table != NULL; table = table->next)
    {
    sr = sqlDescribe(conn, table->name);
    while ((row = sqlNextRow(sr)) != NULL)
	{
	safef(fullName, sizeof(fullName), "%s.%s.%s",
	    database, table->name, row[0]);
	hashAdd(hash, fullName, NULL);
	}
    sqlFreeResult(&sr);
    }
slFreeList(&tableList);
sqlDisconnect(&conn);
}

struct hash *sqlAllFields(void)
/* Get hash of all fields in database.table.field format.  */
{
struct hash *fullHash = hashNew(18);
struct hash *dbHash = sqlHashOfDatabases();
struct hashEl *dbList, *db;
dbList = hashElListHash(dbHash);
for (db = dbList; db != NULL; db = db->next)
    sqlAddDatabaseFields(db->name, fullHash);
slFreeList(&dbList);
hashFree(&dbHash);
return fullHash;
}


void sqlCleanupAll(void)
/* Cleanup all open connections and resources. */
{
if (sqlOpenConnections)
    {
    struct dlNode *conNode, *conNext;
    struct sqlConnection *conn;
    for (conNode = sqlOpenConnections->head; conNode->next != NULL; conNode = conNext)
	{
	conn = conNode->val;
	conNext = conNode->next;
        conn->inCache = FALSE; // really should be cleaning up caches too
        conn->isFree = FALSE;
	sqlDisconnect(&conn);
	}
    freeDlList(&sqlOpenConnections);
    }
}

static void sqlInitTracking(void)
/* Initialize tracking and freeing of resources. */
{
if (sqlOpenConnections == NULL)
    {
    sqlOpenConnections = newDlList();
    atexit(sqlCleanupAll);
    }
}

static bool sqlIsUcscServer()
/* Return TRUE if this is one of our own servers at UCSC */
{
// partially copied from hdb.c, but avoids dependendcy on hdb.c
char *httpHost = getenv("HTTP_HOST");
if (httpHost==NULL) // this is not running as a CGI, but a command-line program
    return FALSE;
return (containsStringNoCase(httpHost, ".ucsc.edu")!=NULL);
}

static struct sqlConnection *sqlConnRemoteFillIn(struct sqlConnection *sc, 
					   struct sqlProfile *sp,
                                           char *database, boolean abort, boolean addAsOpen)
/* Fill the sqlConnection object: Connect to database somewhere as somebody.
 * Database maybe NULL to just connect to the server.  If abort is set display
 * error message and abort on error. This is the core function that connects to
 * a MySQL server. */
{
MYSQL *conn;
long deltaTime;

sqlInitTracking();

sc->resultList = newDlList();

if (addAsOpen)
    sc->node = dlAddValTail(sqlOpenConnections, sc);

long oldTime = monitorEnterTime;
monitorEnterTime = 0;
monitorEnter();

if ((sc->conn = conn = mysql_init(NULL)) == NULL)
    // no need for monitorLeave here
    errAbort("Couldn't connect to mySQL.");
// Fix problem where client LOCAL setting is disabled by default for security
mysql_options(conn, MYSQL_OPT_LOCAL_INFILE, NULL);

// Boolean option to tell client to verify that the host server certificate Subject CN equals the hostname.
// If turned on this can defeat Man-In-The-Middle attacks.
if (sp->verifyServerCert && !sameString(sp->verifyServerCert,"0"))
    {
    #if !defined(MARIADB_VERSION_ID) && MYSQL_VERSION_ID >= 80000
    mysql_options(conn, MYSQL_OPT_SSL_MODE, SSL_MODE_REQUIRED);
    #else
    my_bool flag = TRUE;
    mysql_options(conn, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, &flag);
    #endif
    }

#if (MYSQL_VERSION_ID >= 50603) // mysql version "5.6.3"
    // If certificate revocation list file provided, set mysql option
    if (sp->crl)
	mysql_options(conn, MYSQL_OPT_SSL_CRL, &sp->crl);

    // If path to directory with crl files provided, set mysql option
    if (sp->crlPath)
	mysql_options(conn, MYSQL_OPT_SSL_CRLPATH, &sp->crlPath);
#endif

if (sp->key || sp->cert || sp->ca || sp->caPath || sp->cipher)
    mysql_ssl_set(conn, sp->key, sp->cert, sp->ca, sp->caPath, sp->cipher); 

if (mysql_real_connect(
	conn,
	sp->host, /* host */
	sp->user,	/* user name */
	sp->password,	/* password */
	database, /* database */
	sp->port,	/* port */
	sp->socket,	/* socket */
	0)	/* flags */  == NULL)
    {
    monitorLeave();
    monitorEnterTime = oldTime;

    char *extraMsg = "";
    if (sqlIsUcscServer())
        extraMsg = "We hate this error more than any other and may be already looking into it. You can help us by telling us "
            "about it, "
            "the email is genome-www@soe.ucsc.edu. We will fix it ASAP. "
            "And even if this server is failing right now, usually, one of our other three "
            "international mirrors is still "
            "working. The three mirrors are genome.ucsc.edu (US), genome-euro.ucsc.edu (Germany), genome-asia.ucsc.edu "
            "(Japan). You may not find your custom tracks and saved sessions there, but using another mirror should allow "
            "continuing your work while we are fixing the problem. ";

    if (abort)
	errAbort("Couldn't connect to database %s on %s as %s.\n%s\n%s",
	    database, sp->host, sp->user, mysql_error(conn), extraMsg);
    else if (sqlParanoid)
	fprintf(stderr, "Couldn't connect to database %s on %s as %s.  "
		"mysql: %s  pid=%ld\n",
		database, sp->host, sp->user, mysql_error(conn), (long)getpid());
    return NULL;
    }

/* Make sure the db is correct in the connect, think usually happens if there
 * is a mismatch between MySQL library and code.  If this happens, please
 * figure out what is going on.  Contact markd if you need help. */
if (((conn->db != NULL) && !sameString(database, conn->db))
   || ((conn->db == NULL) && (database != NULL)))
   errAbort("apparent mismatch between mysql.h used to compile jksql.c and libmysqlclient");

sc->db=cloneString(database);
if (monitorFlags & JKSQL_TRACE)
    monitorPrint(sc, "SQL_CONNECT", "%s %s", sp->host, sp->user);

deltaTime = monitorLeave();
if (monitorFlags & JKSQL_TRACE)
    monitorPrint(sc, "SQL_TIME", "%0.3fs", ((double)deltaTime)/1000.0);
monitorEnterTime = oldTime;

sqlNumOpenConnections++;
if (sqlNumOpenConnections > maxNumConnections)
    maxNumConnections = sqlNumOpenConnections;
totalNumConnects++;

sc->hasTableCache=-1; // -1 => not determined 
return sc;
}

static struct sqlConnection *sqlConnRemote(struct sqlProfile* sp, char *database, boolean abort)
/* Connect to database somewhere as somebody. Database maybe NULL to just
 * connect to the server.  If abort is set display error message and abort on
 * error. */
{
struct sqlConnection *sc;
AllocVar(sc);
return sqlConnRemoteFillIn(sc, sp, database, abort, TRUE);
}

struct sqlConnection *sqlConnectRemote(char *host, char *user, char *password,
                                       char *database)
/* Connect to database somewhere as somebody. Database maybe NULL to
 * just connect to the server. Abort on error. 
 * This only takes limited connection parameters. Use Full version for access to all.*/
{
struct sqlProfile *sp;
AllocVar(sp);
sp->host = cloneString(host);
sp->user = cloneString(user);
sp->password = cloneString(password);
return sqlConnRemote(sp, database, TRUE);
}


struct sqlConnection *sqlMayConnectRemote(char *host, char *user, char *password,
                                          char *database)
/* Connect to database somewhere as somebody. Database maybe NULL to
 * just connect to the server.  Return NULL if can't connect. 
 * This only takes limited connection parameters. Use Full version for access to all.*/
{
struct sqlProfile *sp;
AllocVar(sp);
sp->host = cloneString(host);
sp->user = cloneString(user);
sp->password = cloneString(password);
return sqlConnRemote(sp, database, FALSE);
}

struct sqlConnection *sqlConnectRemoteFull(struct slPair *pairs, char *database)
/* Connect to database somewhere as somebody. Database maybe NULL to
 * just connect to the server. Abort on error. 
 * Connection parameter pairs contains a list of name/values. */
{
struct sqlProfile *sp = sqlProfileFromPairs(pairs);
return sqlConnRemote(sp, database, TRUE);
}

struct sqlConnection *sqlMayConnectRemoteFull(struct slPair *pairs, char *database)
/* Connect to database somewhere as somebody. Database maybe NULL to
 * just connect to the server.  
 * Connection parameter pairs contains a list of name/values. Return NULL if can't connect.*/
{
struct sqlProfile *sp = sqlProfileFromPairs(pairs);
return sqlConnRemote(sp, database, FALSE);
}


static struct sqlConnection *sqlUnconnectedConn(struct sqlProfile* profile, char* database)
/* create a sqlConnection object that has all information to connect but is actually
 * not connected yet, as indicated by a NULL mysql connection pointer */
{
static struct sqlConnection *sc;
AllocVar(sc);
sc->conn = NULL;
sc->profile = profile; // remember the profile, needed to connect later
sc->db = cloneString(database);
sc->hasTableCache = -1; // -1 => undefined
return sc;
}

static struct sqlConnection *sqlConnProfile(struct sqlProfile* sp, char *database, boolean abort)
/* Connect to database using the profile.  Database maybe NULL to connect to
 * the server. Optionally abort on failure. */
{
if (monitorFlags & JKSQL_TRACE)
    fprintf(stderr, "SQL_CONNECT_PROFILE %s %s %s\n", sp->name, sp->host, database);
bool mainAbort = abort;
struct sqlConnection *sc;

// get the failover profile for the profile, if it exists
struct sqlProfile *failoverProf = sqlProfileGetFailover(sp, database);
// if we have a failover profile, don't abort right away
if (failoverProf!=NULL)
    mainAbort = FALSE;

// connect with the default profile
sc = sqlConnRemote(sp, database, mainAbort);
if (failoverProf==NULL)
    // the default case, without a failover connection: just return sc, can be NULL
    return sc;

// local-only databases must never use the failover connection
// The alternative would be to not use a failover connection for any database that does not have a
// a tableList table, but then if the UCSC admins ever forget to create
// tableList tables, there would be no error and users would simply not see the
// remote tables anymore. We prefer a clear config statement where the local
// admin has to list the databases that do not exist on the public mysql
// server.
// Another alternative would be keep the main connection "hanging" (see below), but that would
// cost time at some point later. It's fastest to never connect at all for local-only assemblies.
char cfgName[255];
safef(cfgName, sizeof(cfgName), "%s.excludeDbs", failoverProf->name);
char *failOverExclude = cfgOption(cfgName);
if (failOverExclude)
{
    struct slName *noFoDbs = slNameListFromString(failOverExclude, ',');
    if (slNameInList(noFoDbs, database))
    {
        fprintf(stderr, "SQL_CONNECT_IS_EXCLUDED %s\n", database);
        slNameFree(noFoDbs);
        return sc;
    }
}
// if the requested database exists only on the failover connection, then the main connect 
// failed. We just connect again without a database, and store the database name
if (sc==NULL)
    {
    if (monitorFlags & JKSQL_TRACE)
        fprintf(stderr, "SQL_CONNECT_MAIN_FAIL %s\n", database);
    sc = sqlConnRemote(sp, NULL, TRUE);
    sc->db = cloneString(database);
    }

sc->profile = sp; // remember the profile

// don't connect the failOver connection yet: lazily connect later when needed
sc->failoverConn = sqlUnconnectedConn(failoverProf, database);
return sc;
}

struct sqlConnection *sqlMayConnect(char *database)
/* Connect to database on default host as default user.
 * Return NULL (don't abort) on failure. */
{
return sqlConnProfile(sqlProfileMustGet(NULL, database), database, FALSE);
}

static boolean sqlConnectIfUnconnected(struct sqlConnection *sc, bool abort)
/* Take a yet unconnected sqlConnection object and connect it to the sql server. 
 * returns TRUE on success, FALSE otherwise. 
 * This allows us to have mysql connection objects with a server name, port,
 * database etc, but no actual mysql connection setup yet. The connection is
 * only done when a query comes in. This saves a lot of time, as the failover
 * connection object is just tracking the database changes on the main
 * connection, and connects only when really necessary.  */
{
if (!sc)
    return FALSE;

if (sc->conn!=NULL)
    return TRUE;
char *profName = NULL;
if (sc->profile)
    profName = sc->profile->name;
struct sqlProfile *sp = sqlProfileMustGet(profName, sc->db);
return (sqlConnRemoteFillIn(sc, sp, sc->db, abort, FALSE) != NULL);
}

struct sqlConnection *sqlConnect(char *database)
/* Connect to database on default host as default user. */
{
struct sqlProfile *defProf = sqlProfileMustGet(NULL, database);
return sqlConnProfile(defProf, database, TRUE);
}

struct sqlConnection *sqlConnectProfile(char *profileName, char *database)
/* Connect to profile or database using the specified profile.  Can specify
 * profileName, database, or both. The profile is the prefix to the host,
 * user, and password variables in .hg.conf.  For the default profile of "db",
 * the environment variables HGDB_HOST, HGDB_USER, and HGDB_PASSWORD can
 * override.
 */
{
struct sqlProfile* sp = sqlProfileMustGet(profileName, database);
return sqlConnRemote(sp, database, TRUE);
}

struct sqlConnection *sqlMayConnectProfile(char *profileName, char *database)
/* Connect to profile or database using the specified profile. Can specify
 * profileName, database, or both. The profile is the prefix to the host,
 * user, and password variables in .hg.conf.  For the default profile of "db",
 * the environment variables HGDB_HOST, HGDB_USER, and HGDB_PASSWORD can
 * override.  Return NULL if connection fails or profile is not found.
 */
{
struct sqlProfile* sp = sqlProfileGet(profileName, database);
if (sp == NULL)
    return NULL;
return sqlConnRemote(sp, database, FALSE);
}

void sqlVaWarn(struct sqlConnection *sc, char *format, va_list args)
/* Default error message handler. */
{
MYSQL *conn = sc->conn;
if (format != NULL) {
    vaWarn(format, args);
    }
warn("mySQL error %d: %s (profile=%s, host=%s, db=%s)", mysql_errno(conn), 
    mysql_error(conn), scConnProfile(sc), sqlGetHost(sc), scConnDb(sc));
}

void sqlWarn(struct sqlConnection *sc, char *format, ...)
/* Printf formatted error message that adds on sql
 * error message. */
{
va_list args;
va_start(args, format);
sqlVaWarn(sc, format, args);
va_end(args);
}

void sqlAbort(struct sqlConnection  *sc, char *format, ...)
/* Printf formatted error message that adds on sql
 * error message and abort. */
{
va_list args;
va_start(args, format);
sqlVaWarn(sc, format, args);
va_end(args);
noWarnAbort();
}

struct sqlConnection *sqlFailoverConn(struct sqlConnection *sc)
/* Returns the failover connection of a connection or NULL.
 * (Needed because the sqlConnection is not in the .h file) */
{
return sc->failoverConn;
}

bool sqlConnMustUseFailover(struct sqlConnection *sc)
/* Returns true if a connection has a failover connection and 
 * the current db does not exist on the main connection.
*/
{
// a db that is different between the sqlConnection object and mysql means that we have
// moved previously to a db that does not exist on the main connection server
if ((sc && sc->failoverConn != NULL) && differentStringNullOk(sc->db, sc->conn->db))
    {
    monitorPrint(sc, "SQL_MAINCONN_DB_INVALID", "%s != %s", sc->db, sc->conn->db);
    return TRUE;
    }

return FALSE;
}

char *sqlHostInfo(struct sqlConnection *sc)
/* Returns the mysql host info for the connection, must be connected. */
{
return (char *) mysql_get_host_info(sc->conn);
}

static struct sqlResult *sqlUseOrStore(struct sqlConnection *sc,
	char *query, ResGetter *getter, boolean abort)
/* Returns NULL if result was empty and getter==mysql_use_result.
 * Otherwise returns a structure that you can do sqlRow() on.
 * Watch out for subtle differences between mysql_store_result and mysql_use_result.
 * We seem to be only using mysql_use_result these days,
 * but mysql_store_result has left a big footprint in the code/comments.
 * In particular, mysql_store_result can return NULL indicating an empty resultset.
 * But mysql_use_result cannot do that. Instead NULL return means error
 * and the user must call next_row to see if there's anything in the resultset.
 */
{
struct sqlResult *res = NULL;
struct sqlConnection *scMain = sc;
long deltaTime;
boolean fixedMultipleNOSQLINJ = FALSE;

++sqlTotalQueries;

if (monitorFlags & JKSQL_TRACE)
    monitorPrintQuery(sc, query);

if (startsWith(NOSQLINJ "", query))
    {
    query += NOSQLINJ_SIZE; // We know this query has been vetted for sql injection, skip over this tag.
    }
else
    {
    sqlCheckError("Unvetted query: %s", query);
    }

// additional check finds errors of multiple NOSQLINJ tags
if (strstr(query, NOSQLINJ ""))
    {
    sqlCheckError("Oops, multiple occurrences of NOSQLINJ tag in query: %s", query);
    query = replaceChars(query, NOSQLINJ "", "");
    fixedMultipleNOSQLINJ = TRUE;
    }

if (sqlConnMustUseFailover(sc))
    sc = sc->failoverConn;

sqlConnectIfUnconnected(sc, abort);
assert(!sc->isFree);

monitorEnter();
int mysqlError = mysql_real_query(sc->conn, query, strlen(query));

// if the query fails on the main connection, connect the failover connection and try there
if (mysqlError != 0 && sc->failoverConn && sameWord(sqlGetDatabase(sc), sqlGetDatabase(sc->failoverConn)))
    {
    if (monitorFlags & JKSQL_TRACE)
        monitorPrint(sc, "SQL_FAILOVER", "%s -> %s | %s", scConnProfile(sc),
            scConnProfile(sc->failoverConn), query);

    sc = sc->failoverConn;
    if (sqlConnectIfUnconnected(sc, FALSE))
        mysqlError = mysql_real_query(sc->conn, query, strlen(query));
    else
        // This database does not exist on the (slow-db) failover mysql server
        // It makes more sense to the show the error message we got from our main db
        sc = scMain;
    }

if (mysqlError != 0)
    {
    if (abort)
        {
        monitorLeave();
	// Extra debugging info.
	if (sameOk(cfgOption("noSqlInj.dumpStack"), "on"))
    	    dumpStack("DEBUG Can't start query");
	sqlAbort(sc, "Can't start query:\n%s\n", query);
        }
    }
else
    {
    MYSQL_RES *resSet;
    if ((resSet = getter(sc->conn)) == NULL)
	{
	if (mysql_errno(sc->conn) != 0)
	    {
            monitorLeave();
	    sqlAbort(sc, "Can't use query:\n%s", query);
	    }
	}
    else
        {
        AllocVar(res);
        res->conn = sc;
        res->result = resSet;
        res->node = dlAddValTail(sc->resultList, res);
        res->fetchTime = 0L;
        }
    }
deltaTime = monitorLeave();
if (monitorFlags & JKSQL_TRACE)
    monitorPrint(sc, "SQL_TIME", "%0.3fs", ((double)deltaTime)/1000.0);
if (fixedMultipleNOSQLINJ)
    freeMem(query);
return res;
}

void sqlRenameTable(struct sqlConnection *sc, char *table1, char *table2)
/* Rename table1 to table2 */
{
char query[256];
sqlSafef(query, sizeof(query), "rename table %s to %s", table1, table2);
sqlUpdate(sc, query);
}

void sqlDropTable(struct sqlConnection *sc, char *table)
/* Drop table if it exists. */
{
if (sqlTableExists(sc, table))
    {
    char query[256];
    sqlSafef(query, sizeof(query), "drop table %s", table);
    sqlUpdate(sc, query);
    }
}

void sqlCopyTable(struct sqlConnection *sc, char *table1, char *table2)
/* Copy table1 to table2 */
{
char query[256];

if (table1 == NULL || table2 == NULL)
    return;
sqlSafef(query, sizeof(query), "create table %s like %s", table2, table1);
sqlUpdate(sc, query);
sqlSafef(query, sizeof(query), "insert into %s select * from  %s", table2, table1);
sqlUpdate(sc, query);
}

void sqlGetLockWithTimeout(struct sqlConnection *sc, char *name, int wait)
/* Tries to get an advisory lock on the process, waiting for wait seconds. */
/* Blocks another client from obtaining a lock with the same name. */
{
char query[256];
struct sqlResult *res;
char **row = NULL;

sqlSafef(query, sizeof(query), "select get_lock('%s', %d)", name, wait);
res = sqlGetResult(sc, query);
while ((row=sqlNextRow(res)))
    {
    if (sameWord(*row, "1")) // success
        break;
    else if (sameWord(*row, "0"))  // timed out
        errAbort("Attempt to GET_LOCK timed out.\nAnother client may have locked this name, %s\n.", name);
    else if (*row == NULL) // other error
        errAbort("Attempt to GET_LOCK of name, %s, caused an error\n", name);
    }
sqlFreeResult(&res);
}

void sqlGetLock(struct sqlConnection *sc, char *name)
/* Gets an advisory lock created by GET_LOCK in sqlGetLock. Waits up to 1000 seconds. */
{
sqlGetLockWithTimeout(sc, name, 1000);
}

boolean sqlIsLocked(struct sqlConnection *sc, char *name)
/* Tests if an advisory lock on the given name has been set. 
 * Returns true if lock has been set, otherwise returns false. */
{
char query[256];
struct sqlResult *res;
char **row = NULL;
boolean result = FALSE;

sqlSafef(query, sizeof(query), "select is_free_lock('%s')", name);
res = sqlGetResult(sc, query);
while ((row=sqlNextRow(res)))
    {
    if (sameWord(*row, "1")) // lock is free (not locked)
	{
	result = FALSE;
        break;
	}
    else if (sameWord(*row, "0"))  // lock is not free (locked)
	{
	result = TRUE;
        break;
	}
    else if (*row == NULL) // other error
        errAbort("Attempt to GET_LOCK of name, %s, caused an error\n", name);
    }
sqlFreeResult(&res);
return result;
}


void sqlReleaseLock(struct sqlConnection *sc, char *name)
/* Releases an advisory lock created by GET_LOCK in sqlGetLock */
{
char query[256];

sqlSafef(query, sizeof(query), "select release_lock('%s')", name);
sqlUpdate(sc, query);
}

void sqlHardUnlockAll(struct sqlConnection *sc)
/* Unlock any hard locked tables. */
{
if (sc->hasHardLock)
    {
    char query[32];
    sqlSafef(query, sizeof query, "unlock tables");
    sqlUpdate(sc, query);
    sc->hasHardLock = FALSE;
    }
}

void sqlHardLockTables(struct sqlConnection *sc, struct slName *tableList,
	boolean isWrite)
/* Hard lock given table list.  Unlock with sqlHardUnlockAll. */
{
struct dyString *dy = dyStringNew(0);
struct slName *table;
char *how = (isWrite ? "WRITE" : "READ");

if (sc->hasHardLock)
    errAbort("sqlHardLockTables repeated without sqlHardUnlockAll.");
sqlDyStringPrintf(dy, "LOCK TABLES ");
for (table = tableList; table != NULL; table = table->next)
    {
    sqlDyStringPrintf(dy, "%s %s", table->name, how);
    if (table->next != NULL)
       sqlDyStringPrintf(dy, ",");
    }
sqlUpdate(sc, dy->string);

sc->hasHardLock = TRUE;
dyStringFree(&dy);
}

void sqlHardLockTable(struct sqlConnection *sc, char *table, boolean isWrite)
/* Lock a single table.  Unlock with sqlHardUnlockAll. */
{
struct slName *list = slNameNew(table);
sqlHardLockTables(sc, list, isWrite);
slFreeList(&list);
}

void sqlHardLockAll(struct sqlConnection *sc, boolean isWrite)
/* Lock all tables in current database.  Unlock with sqlHardUnlockAll. */
{
struct slName *tableList =  sqlListTables(sc);
sqlHardLockTables(sc, tableList, isWrite);
slFreeList(&tableList);
}

boolean sqlMaybeMakeTable(struct sqlConnection *sc, char *table, char *query)
/* Create table from query if it doesn't exist already.
 * Returns FALSE if didn't make table. */
{
if (sqlTableExists(sc, table))
    return FALSE;
sqlUpdate(sc, query);
return TRUE;
}

char *sqlGetCreateTable(struct sqlConnection *sc, char *table)
/* Get the Create table statement. table must exist. */
{
char query[256];
struct sqlResult *res;
char **row = NULL;
char *statement = NULL;

sqlSafef(query, sizeof(query), "show create table %s", table);
res = sqlGetResult(sc, query);
if ((row=sqlNextRow(res)))
    {
    // skip first column which has useless table name in it.
    statement = cloneString(row[1]);
    }
sqlFreeResult(&res);
return statement;
}

void sqlRemakeTable(struct sqlConnection *sc, char *table, char *create)
/* Drop table if it exists, and recreate it. */
{
sqlDropTable(sc, table);
sqlUpdate(sc, create);
}

boolean sqlDatabaseExists(char *database)
/* Return TRUE if database exists. */
{
struct sqlConnection *conn = sqlMayConnect(database);
boolean exists = (conn != NULL);
sqlDisconnect(&conn);
return exists;
}

boolean sqlTableExists(struct sqlConnection *sc, char *table)
/* Return TRUE if a table exists. 
 *
 * If a failover connection is configured in hg.conf, looks up table in the main connection first 
 * Uses a table name cache table, if configured in hg.conf
 */
{
char query[256];
struct sqlResult *sr;
if (sameString(table,""))
    {
    if (sameOk(cfgOption("noSqlInj.dumpStack"), "on"))
	{
	dumpStack("jksql sqlTableExists: Buggy code is feeding me empty table name. table=[%s].\n", table);
	fflush(stderr); // log only
	}
    return FALSE;
    }
// TODO If the ability to supply a list of tables is hardly used,
// then we could switch it to simply %s below supporting a single
// table at a time more securely.
if (strchr(table,','))
    {
    if (sameOk(cfgOption("noSqlInj.dumpStack"), "on"))
	dumpStack("sqlTableExists called on multiple tables with table=[%s]\n", table);
    }
if (strchr(table,'%'))
    {
    if (sameOk(cfgOption("noSqlInj.dumpStack"), "on"))
	{
	dumpStack("jksql sqlTableExists: Buggy code is feeding me junk wildcards. table=[%s].\n", table);
	fflush(stderr); // log only
	}
    return FALSE;
    }
if (strchr(table,'-'))
    {
    return FALSE;  // mysql does not allow tables with dash (-) so it will not be found.
    // hg/lib/hdb.c can generate an invalid table names with dashes while looking for split tables,
    // if the first chrom name has a dash in it. Examples found were: scaffold_0.1-193456 scaffold_0.1-13376 HERVE_a-int 1-1
    // Assembly hubs also may have dashes in chrom names.
    }

// use the table cache if we have one
struct sqlConnection *cacheConn = sqlTableCacheFindConn(sc);
if (cacheConn)
    return sqlTableCacheTableExists(cacheConn, table);

char *err;
unsigned int errNo;
const int tableNotFoundCode = 1146;

sqlCkIl(tableSafe,table)
//char tableSafe[strlen(table)+9+1]; sqlCheckIdentifiersList(tableSafe, sizeof tableSafe, table);
sqlSafef(query, sizeof(query), "SELECT 1 FROM %-s LIMIT 0", tableSafe);  

if ((sr = sqlGetResultExt(sc, query, &errNo, &err)) == NULL)
    {
    if (errNo == tableNotFoundCode)
        return FALSE;
    if (sc->failoverConn)
	{
	// if not found but we have a failover connection, check on it, too
	if ((sr = sqlGetResultExt(sc->failoverConn, query, &errNo, &err)) == NULL)
	    {
	    if (errNo == tableNotFoundCode)
		return FALSE;
	    }
	}
    }

if (!sr)
    errAbort("Mysql error during sqlTableExists(%s) %d: %s", table, errNo, err);

sqlFreeResult(&sr);
return TRUE;
}

// Note: this is copied from hdb.c's hParseDbDotTable.  Normally I abhor copying but I really
// don't want to make jksql.c depend on hdb.h...
void sqlParseDbDotTable(char *dbIn, char *dbDotTable, char *dbOut, size_t dbOutSize,
                        char *tableOut, size_t tableOutSize)
/* If dbDotTable contains a '.', then assume it is db.table and parse out into dbOut and tableOut.
 * If not, then it's just a table; copy dbIn into dbOut and dbDotTable into tableOut. */
{
char *dot = strchr(dbDotTable, '.');
char *table = dbDotTable;
if (dot != NULL)
    {
    safencpy(dbOut, dbOutSize, dbDotTable, dot - dbDotTable);
    table = &dot[1];
    }
else
    safecpy(dbOut, dbOutSize, dbIn);
safecpy(tableOut, tableOutSize, table);
}

// forward declaration to avoid moving code around:
static boolean sqlConnChangeDbMainOrFailover(struct sqlConnection *sc, char *database, boolean abort);

bool sqlColumnExists(struct sqlConnection *conn, char *table, char *column)
/* return TRUE if column exists in table. column can contain sql wildcards  */
{
// "show columns ... like" does not support db.table names, so temporarily change database
// if table is really db.table.
char *connDb = cloneString(sqlGetDatabase(conn));
char tableDb[1024];
char tableName[1024];
sqlParseDbDotTable(connDb, table, tableDb, sizeof tableDb, tableName, sizeof tableName);
char query[1024];
sqlSafef(query, 1024, "SHOW COLUMNS FROM `%s` LIKE '%s'", tableName, column);
boolean changeDb = differentStringNullOk(connDb, tableDb);
if (changeDb)
    sqlConnChangeDbMainOrFailover(conn, tableDb, TRUE);
char buf[1024];
char *ret = sqlQuickQuery(conn, query, buf, 1024);
if (changeDb)
    sqlConnChangeDbMainOrFailover(conn, connDb, TRUE);
freeMem(connDb);
return (ret!=NULL);
}

boolean sqlColumnExistsInTablesList(struct sqlConnection *conn, char *tables, char *field)
/* check if column exists in a list of tables */
{
boolean result = FALSE;
struct slName *tablesList = slNameListFromComma(tables);
struct slName *table;
for(table = tablesList; table; table = table->next)
    {
    if (sqlColumnExists(conn, table->name, field))
	{
	result = TRUE;
	break;
	}
    }
slFreeList(&tablesList);
return result;
}

int sqlTableSizeIfExists(struct sqlConnection *sc, char *table)
/* Return row count if a table exists, -1 if it doesn't. */
{
char query[256];
struct sqlResult *sr;
char **row = 0;
int ret = 0;

sqlSafef(query, sizeof(query), "select count(*) from %s", table);
if ((sr = sqlUseOrStore(sc, query, DEFAULTGETTER, FALSE)) == NULL)
    return -1;
row = sqlNextRow(sr);
if (row != NULL && row[0] != NULL)
    ret = atoi(row[0]);
sqlFreeResult(&sr);
return ret;
}

boolean sqlTablesExist(struct sqlConnection *conn, char *tables)
/* Check all tables in space delimited string exist. */
{
char *dupe = cloneString(tables);
char *s = dupe, *word;
boolean ok = TRUE;
while ((word = nextWord(&s)) != NULL)
     {
     if (!sqlTableExists(conn, word))
         {
	 ok = FALSE;
	 break;
	 }
     }
freeMem(dupe);
return ok;
}

boolean sqlTableWildExists(struct sqlConnection *sc, char *table)
/* Return TRUE if table (which can include SQL wildcards) exists.
 * A bit slower than sqlTableExists. */
{
char query[512];
struct sqlResult *sr;
char **row;
boolean exists;
// "show tables" does not support db.table names, so temporarily change database
// if table is really db.table.
char *connDb = cloneString(sqlGetDatabase(sc));
char tableDb[1024];
char tableName[1024];
sqlParseDbDotTable(connDb, table, tableDb, sizeof tableDb, tableName, sizeof tableName);

sqlSafef(query, sizeof(query), "show tables like '%s'", tableName);
boolean changeDb = differentStringNullOk(connDb, tableDb);
if (changeDb)
    sqlConnChangeDbMainOrFailover(sc, tableDb, TRUE);
sr = sqlGetResult(sc, query);
exists = ((row = sqlNextRow(sr)) != NULL);
sqlFreeResult(&sr);
if (changeDb)
    sqlConnChangeDbMainOrFailover(sc, connDb, TRUE);
freeMem(connDb);
return exists;
}

static char **sqlMaybeNextRow(struct sqlResult *sr, boolean *retOk)
/* Get next row from query result; set retOk according to error status. */
{
char** row = NULL;
if (sr != NULL)
    {
    monitorEnter();
    row = mysql_fetch_row(sr->result);
    sr->fetchTime += monitorLeave();
    if (mysql_errno(sr->conn->conn) != 0)
	{
	if (retOk != NULL)
	    *retOk = FALSE;
	}
    else if (retOk != NULL)
	*retOk = TRUE;
    }
else if (retOk != NULL)
    *retOk = TRUE;
return row;
}



struct sqlResult *sqlGetResultExt(struct sqlConnection *sc, char *query, unsigned int *errorNo, char **error)
/* Returns NULL if it had an error.
 * Otherwise returns a structure that you can do sqlRow() on.
 * If there was an error, *errorNo will be set to the mysql error number,
 * and *error will be set to the mysql error string, which MUST NOT be freed. */
{
struct sqlResult *sr = sqlUseOrStore(sc, query, DEFAULTGETTER, FALSE);
if (sr == NULL)
    {
    MYSQL *conn = sc->conn;
    if (errorNo)
    	*errorNo=mysql_errno(conn);
    if (error)
    	*error=(char *)mysql_error(conn);
    }
else
    {
    if (errorNo)
    	*errorNo=0;
    if (error)
    	*error=NULL;
    }
return sr;
}

struct sqlResult *sqlGetResult(struct sqlConnection *sc, char *query)
/* 
 * Return a structure that you can do sqlNextRow() on. 
 * (You need to check the return value of sqlRow to find out if there are
 * any results.) */
{
return sqlUseOrStore(sc, query, DEFAULTGETTER, TRUE);
}

struct sqlResult *sqlMustGetResult(struct sqlConnection *sc, char *query)
/* 
 * Return a structure that you can do sqlNextRow() on. 
 * DOES NOT errAbort() IF THERE ARE NO RESULTS 
 * (These days, with mysql_use_result, we cannot know ahead of time
 * if there are results, we can only know by actually trying to fetch a row.
 * So in fact right now sqlMustGetResult is no different than sqlGetResult.) */
{
struct sqlResult *res = sqlGetResult(sc,query);
if (res == NULL)
	errAbort("Object not found in database.\nQuery was %s", query);
return res;
}


void sqlUpdate(struct sqlConnection *conn, char *query)
/* Tell database to do something that produces no results table. */
{
struct sqlResult *sr;
sr = sqlGetResult(conn,query);
sqlFreeResult(&sr);
}

int sqlUpdateRows(struct sqlConnection *conn, char *query, int* matched)
/* Execute an update query, returning the number of rows changed.  If matched
 * is not NULL, it gets the total number matching the query. */
{
int numChanged = 0;
int numMatched = 0;
const char *info;
int numScan = 0;
struct sqlResult *sr = sqlGetResult(conn,query);

/* Rows matched: 40 Changed: 40 Warnings: 0 */
monitorEnter();
info = mysql_info(conn->conn);
monitorLeave();
if (info != NULL)
    numScan = sscanf(info, "Rows matched: %d Changed: %d Warnings: %*d",
                     &numMatched, &numChanged);
if ((info == NULL) || (numScan < 2))
    errAbort("can't get info (maybe not an sql UPDATE): %s", query);
sqlFreeResult(&sr);
if (matched != NULL)
    *matched = numMatched;
return numChanged;
}

void sqlWarnings(struct sqlConnection *conn, int numberOfWarnings)
/* Show the number of warnings requested. New feature in mysql5. */
{
struct sqlResult *sr;
char **row;
char query[256];
struct dyString *dy = dyStringNew(0);
sqlSafef(query,sizeof(query),"show warnings limit 0, %d", numberOfWarnings);
sr = sqlGetResult(conn, query);
dyStringPrintf(dy, "Level Code Message\n");
while ((row = sqlNextRow(sr)) != NULL)
    {
    dyStringPrintf(dy, "%s %s %s\n", row[0], row[1], row[2]);
    }
sqlFreeResult(&sr);
warn("%s", dy->string);
dyStringFree(&dy);
}

int sqlWarnCount(struct sqlConnection *conn)
/* Return the number of warnings. New feature in mysql5. */
{
char query[64];
sqlSafef(query, sizeof query, "SHOW COUNT(*) WARNINGS");
return sqlQuickNum(conn, query);
}


void sqlLoadTabFile(struct sqlConnection *conn, char *path, char *table,
                    unsigned options)
/* Load a tab-seperated file into a database table, checking for errors.
 * Options are the SQL_TAB_* bit set. SQL_TAB_FILE_ON_SERVER is ignored if
 * sqlIsRemote() returns true. */
{
assert(!conn->isFree);
char tabPath[PATH_LEN];
char query[PATH_LEN+256];
int numScan, numRecs, numSkipped, numWarnings;
char *localOpt, *concurrentOpt, *dupOpt;
const char *info;
struct sqlResult *sr;

/* Doing an "alter table disable keys" command implicitly commits the current
   transaction. Don't want to use that optimization if we need to be transaction
   safe. */
/* FIXME: markd 2003/01/05: mysql 4.0.17 - the alter table enable keys hangs,
 * disable this optimization for now. Verify performance on small loads
 * before re-enabling*/
# if 0
boolean doDisableKeys = !(options & SQL_TAB_TRANSACTION_SAFE);
#else
boolean doDisableKeys = FALSE;
#endif

/* determine if tab file can be accessed directly by the database, or send
 * over the network */
bool sqlNeverLocal = cfgOptionBooleanDefault("db.neverLocal", 0);
if (((options & SQL_TAB_FILE_ON_SERVER) && !sqlIsRemote(conn)) | sqlNeverLocal)
    {
    /* tab file on server requiries full path */
    strcpy(tabPath, "");
    if (path[0] != '/')
        {
        if (getcwd(tabPath, sizeof(tabPath)) == NULL)
	    errAbort("sqlLoadTableFile: getcwd failed");
        safecat(tabPath, sizeof(tabPath), "/");
        }
    safecat(tabPath, sizeof(tabPath), path);
    localOpt = "";
    }
else
    {
    safecpy(tabPath, sizeof(tabPath), path);
    localOpt = "LOCAL";
    }

/* optimize for concurrent to others to access the table. */
if (options & SQL_TAB_FILE_CONCURRENT)
    concurrentOpt = "CONCURRENT";
else
    {
    concurrentOpt = "";
    if (doDisableKeys)
        {
        /* disable update of indexes during load. Inompatible with concurrent,
         * since enable keys locks other's out. */
        sqlSafef(query, sizeof(query), "ALTER TABLE %s DISABLE KEYS", table);
        sqlUpdate(conn, query);
        }
    }

if (options & SQL_TAB_REPLACE)
    dupOpt = "REPLACE";
else
    dupOpt = "";

sqlSafef(query, sizeof(query),  "LOAD DATA %s %s INFILE '%s' %s INTO TABLE %s",
      concurrentOpt, localOpt, tabPath, dupOpt, table);
sr = sqlGetResult(conn, query);
monitorEnter();
info = mysql_info(conn->conn);
monitorLeave();

if (info == NULL)
    errAbort("no info available for result of sql query: %s", query);
numScan = sscanf(info, "Records: %d Deleted: %*d  Skipped: %d  Warnings: %d",
                 &numRecs, &numSkipped, &numWarnings);
if (numScan != 3)
    errAbort("can't parse sql load info: %s", info);
sqlFreeResult(&sr);

/* mysql 5.0 bug: mysql_info returns unreliable warnings count, so use this instead: */
numWarnings = sqlWarnCount(conn);

if ((numSkipped > 0) || (numWarnings > 0))
    {
    boolean doAbort = TRUE;
    if ((numSkipped > 0) && (options & SQL_TAB_FILE_WARN_ON_ERROR))
        doAbort = FALSE;  /* don't abort on errors */
    else if ((numWarnings > 0) &&
             (options & (SQL_TAB_FILE_WARN_ON_ERROR|SQL_TAB_FILE_WARN_ON_WARN)))
        doAbort = FALSE;  /* don't abort on warnings */
    if (numWarnings > 0)
	{
	sqlWarnings(conn, 10);  /* show the first 10 warnings */
	}
    if (doAbort)
        errAbort("load of %s did not go as planned: %d record(s), "
                 "%d row(s) skipped, %d warning(s) loading %s",
                 table, numRecs, numSkipped, numWarnings, path);
    else
        warn("Warning: load of %s did not go as planned: %d record(s), "
             "%d row(s) skipped, %d warning(s) loading %s",
             table, numRecs, numSkipped, numWarnings, path);
    }


if (((options & SQL_TAB_FILE_CONCURRENT) == 0) && doDisableKeys)
    {
    /* reenable update of indexes */
    sqlSafef(query, sizeof(query), "ALTER TABLE %s ENABLE KEYS", table);
    sqlUpdate(conn, query);
    }
}

boolean sqlExists(struct sqlConnection *conn, char *query)
/* Query database and return TRUE if it had a non-empty result. */
{
struct sqlResult *sr;
if ((sr = sqlGetResult(conn,query)) == NULL)
    return FALSE;
else
    {
    if(sqlNextRow(sr) == NULL)
	{
	sqlFreeResult(&sr);
	return FALSE;
	}
    else
	{
	sqlFreeResult(&sr);
	return TRUE;
	}
    }
}

boolean sqlRowExists(struct sqlConnection *conn,
	char *table, char *field, char *key)
/* Return TRUE if row where field = key is in table. */
{
char query[256];
sqlSafef(query, sizeof(query), "select count(*) from %s where %s = '%s'",
	table, field, key);
return sqlQuickNum(conn, query) > 0;
}

int sqlRowCount(struct sqlConnection *conn, char *queryTblAndCondition)
/* Return count of rows that match condition. The queryTblAndCondition
 * should contain everying after "select count(*) FROM " */
{
char query[256];
sqlSafef(query, sizeof(query), "select count(*) from %-s",queryTblAndCondition);  
return sqlQuickNum(conn, query);
}


struct sqlResult *sqlStoreResult(struct sqlConnection *sc, char *query)
/* Returns NULL if result was empty.  Otherwise returns a structure
 * that you can do sqlRow() on.  Same interface as sqlGetResult,
 * but internally this keeps the entire result in memory. */
{
return sqlUseOrStore(sc,query,mysql_store_result, TRUE);
}

char **sqlNextRow(struct sqlResult *sr)
/* Get next row from query result. */
{
boolean ok = FALSE;
char** row = sqlMaybeNextRow(sr, &ok);
if (! ok)
    sqlAbort(sr->conn, "nextRow failed");
return row;
}

char* sqlFieldName(struct sqlResult *sr)
/* repeated calls to this function returns the names of the fields
 * the given result */
{
MYSQL_FIELD *field;
field = mysql_fetch_field(sr->result);
if(field == NULL)
    return NULL;
return field->name;
}

struct slName *sqlResultFieldList(struct sqlResult *sr)
/* Return slName list of all fields in query.  Can just be done once per query. */
{
struct slName *list = NULL;
char *field;
while ((field = sqlFieldName(sr)) != NULL)
    slNameAddHead(&list, field);
slReverse(&list);
return list;
}

int sqlResultFieldArray(struct sqlResult *sr, char ***retArray)
/* Get the fields of sqlResult,  returning count, and the results
 * themselves in *retArray. */
{
struct slName *el, *list = sqlResultFieldList(sr);
int count = slCount(list);
char **array;
AllocArray(array, count);
int i;
for (el=list,i=0; el != NULL; el = el->next, ++i)
    array[i] = cloneString(el->name);
*retArray = array;
return count;
}

int sqlFieldColumn(struct sqlResult *sr, char *colName)
/* get the column number of the specified field in the result, or
 * -1 if the result doesn't contain the field.*/
{
int numFields = mysql_num_fields(sr->result);
int i;
for (i = 0; i < numFields; i++)
    {
    MYSQL_FIELD *field = mysql_fetch_field_direct(sr->result, i);
    if (sameString(field->name, colName))
        return i;
    }
return -1;
}

int sqlCountRows(struct sqlResult *sr)
/* From a sqlResult that was obtained with sqlStoreResult, return the number of rows */
{
if(sr != NULL)
    return mysql_num_rows(sr->result);
return 0;
}

int sqlCountColumns(struct sqlResult *sr)
/* Count the number of columns in result. */
{
if(sr != NULL)
    return mysql_field_count(sr->conn->conn);
return 0;
}

int sqlFieldCount(struct sqlResult *sr)
/* Return number of fields in a row of result. */
{
if (sr == NULL)
    return 0;
return mysql_num_fields(sr->result);
}

int sqlCountColumnsInTable(struct sqlConnection *sc, char *table)
/* Return the number of columns in a table */
{
struct sqlResult *sr;
char **row;
int count;

/* Read table description and count rows. */
sr = sqlDescribe(sc, table);
count = 0;
while ((row = sqlNextRow(sr)) != NULL)
    {
    count++;
    }
sqlFreeResult(&sr);
return count;
}

char *sqlQuickQuery(struct sqlConnection *sc, char *query, char *buf, int bufSize)
/* Does query and returns first field in first row.  Meant
 * for cases where you are just looking up one small thing.
 * Returns NULL if query comes up empty. */
{
struct sqlResult *sr;
char **row;
char *ret = NULL;

if ((sr = sqlGetResult(sc, query)) == NULL)
    return NULL;
row = sqlNextRow(sr);
if (row != NULL && row[0] != NULL)
    {
    safecpy(buf, bufSize, row[0]);
    ret = buf;
    }
sqlFreeResult(&sr);
return ret;
}

char *sqlNeedQuickQuery(struct sqlConnection *sc, char *query,
	char *buf, int bufSize)
/* Does query and returns first field in first row.  Meant
 * for cases where you are just looking up one small thing.
 * Prints error message and aborts if query comes up empty. */
{
char *s = sqlQuickQuery(sc, query, buf, bufSize);
if (s == NULL)
    errAbort("query not found: %s", query);
return s;
}


int sqlQuickNum(struct sqlConnection *conn, char *query)
/* Get numerical result from simple query */
{
struct sqlResult *sr;
char **row;
int ret = 0;

sr = sqlGetResult(conn, query);
row = sqlNextRow(sr);
if (row != NULL && row[0] != NULL)
    ret = atoi(row[0]);
sqlFreeResult(&sr);
return ret;
}

long long sqlQuickLongLong(struct sqlConnection *conn, char *query)
/* Get long long numerical result from simple query. Returns 0 if query not found */
{
struct sqlResult *sr;
char **row;
long long ret = 0;

sr = sqlGetResult(conn, query);
row = sqlNextRow(sr);
if (row != NULL && row[0] != NULL)
    ret = sqlLongLong(row[0]);
sqlFreeResult(&sr);
return ret;
}

double sqlQuickDouble(struct sqlConnection *conn, char *query)
/* Get floating point numerical result from simple query */
{
struct sqlResult *sr;
char **row;
double ret = 0;

sr = sqlGetResult(conn, query);
row = sqlNextRow(sr);
if (row != NULL && row[0] != NULL)
    ret = atof(row[0]);
sqlFreeResult(&sr);
return ret;
}


int sqlNeedQuickNum(struct sqlConnection *conn, char *query)
/* Get numerical result or die trying. */
{
char buf[32];
sqlNeedQuickQuery(conn, query, buf, sizeof(buf));
if (!((buf[0] == '-' && isdigit(buf[1])) || isdigit(buf[0])))
    errAbort("Expecting numerical result to query '%s' got '%s'",
    	query, buf);
return sqlSigned(buf);
}

char *sqlQuickString(struct sqlConnection *sc, char *query)
/* Return result of single-row/single column query in a
 * string that should eventually be freeMem'd. */
{
struct sqlResult *sr;
char **row;
char *ret = NULL;

if ((sr = sqlGetResult(sc, query)) == NULL)
    return NULL;
row = sqlNextRow(sr);
if (row != NULL && row[0] != NULL)
    ret = cloneString(row[0]);
sqlFreeResult(&sr);
return ret;
}

char *sqlNeedQuickString(struct sqlConnection *sc, char *query)
/* Return result of single-row/single column query in a
 * string that should eventually be freeMem'd.  This will
 * print an error message and abort if result returns empty. */
{
char *s = sqlQuickString(sc, query);
if (s == NULL)
    errAbort("query not found: %s", query);
return s;
}

char *sqlQuickNonemptyString(struct sqlConnection *conn, char *query)
/* Return first result of given query.  If it is an empty string
 * convert it to NULL. */
{
char *result = sqlQuickString(conn, query);
if (result != NULL && result[0] == 0)
    freez(&result);
return result;
}

struct slName *sqlQuickList(struct sqlConnection *conn, char *query)
/* Return a list of slNames for a single column query.
 * Do a slFreeList on result when done. */
{
struct slName *list = NULL, *n;
struct sqlResult *sr;
char **row;
sr = sqlGetResult(conn, query);
while ((row = sqlNextRow(sr)) != NULL)
    {
    n = slNameNew(row[0]);
    slAddHead(&list, n);
    }
sqlFreeResult(&sr);
slReverse(&list);
return list;
}

struct hash *sqlQuickHash(struct sqlConnection *conn, char *query)
/* Return a hash filled with results of two column query.
 * The first column is the key, the second the value. */
{
struct hash *hash = hashNew(16);
struct sqlResult *sr;
char **row;
sr = sqlGetResult(conn, query);
while ((row = sqlNextRow(sr)) != NULL)
    hashAdd(hash, row[0], cloneString(row[1]));
sqlFreeResult(&sr);
return hash;
}

struct slInt *sqlQuickNumList(struct sqlConnection *conn, char *query)
/* Return a list of slInts for a single column query.
 * Do a slFreeList on result when done. */
{
struct slInt *list = NULL, *n;
struct sqlResult *sr;
char **row;
sr = sqlGetResult(conn, query);
while ((row = sqlNextRow(sr)) != NULL)
    {
    n = slIntNew(sqlSigned(row[0]));
    slAddHead(&list, n);
    }
sqlFreeResult(&sr);
slReverse(&list);
return list;
}

struct slDouble *sqlQuickDoubleList(struct sqlConnection *conn, char *query)
/* Return a list of slDoubles for a single column query.
 * Do a slFreeList on result when done. */
{
struct slDouble *list = NULL, *n;
struct sqlResult *sr;
char **row;
sr = sqlGetResult(conn, query);
while ((row = sqlNextRow(sr)) != NULL)
    {
    n = slDoubleNew(atof(row[0]));
    slAddHead(&list, n);
    }
sqlFreeResult(&sr);
slReverse(&list);
return list;
}

struct slPair *sqlQuickPairList(struct sqlConnection *conn, char *query)
/* Return a list of slPairs with the results of a two-column query.
 * Free result with slPairFreeValsAndList. */
{
struct slPair *pairList = NULL;
struct sqlResult *sr = sqlGetResult(conn, query);
char **row;
while ((row = sqlNextRow(sr)) != NULL)
    slAddHead(&pairList, slPairNew(row[0], cloneString(row[1])));
sqlFreeResult(&sr);
slReverse(&pairList);
return pairList;
}


int sqlTableSize(struct sqlConnection *conn, char *table)
/* Find number of rows in table. */
{
char query[128];
sqlSafef(query, sizeof(query), "SELECT COUNT(*) FROM %s", table);
return sqlQuickNum(conn, query);
}

int sqlFieldIndex(struct sqlConnection *conn, char *table, char *field)
/* Returns index of field in a row from table, or -1 if it
 * doesn't exist. */
{
struct sqlResult *sr;
char **row;
int i = 0, ix=-1;

/* Read table description into hash. */
sr = sqlDescribe(conn, table);
while ((row = sqlNextRow(sr)) != NULL)
    {
    if (sameString(row[0], field))
        {
	ix = i;
	break;
	}
    ++i;
    }
sqlFreeResult(&sr);
return ix;
}

struct slName *sqlFieldNames(struct sqlConnection *conn, char *table)
/* Returns field names from a table. */
{
struct slName *list = NULL;
struct sqlResult *sr;
char **row;
sr = sqlDescribe(conn, table);
while ((row = sqlNextRow(sr)) != NULL)
    slNameAddHead(&list, row[0]);
sqlFreeResult(&sr);
slReverse(&list);
return list;
}

unsigned int sqlLastAutoId(struct sqlConnection *conn)
/* Return last automatically incremented id inserted into database. */
{
assert(!conn->isFree);
unsigned id;
monitorEnter();
id = mysql_insert_id(conn->conn);
monitorLeave();
return id;
}

/* Stuff to manage and caches of open connections on a database.  Typically
 * you only need 3.  MySQL takes about 2 milliseconds on a local host to open
 * a connection.  On a remote host it can be more and this caching is probably
 * actually necessary. However, much code has been written assuming caching,
 * so it is probably now necessary.
 */

enum {sqlConnCacheMax = 16};

struct sqlConnCache
{
    /* the following are NULL unless explicitly specified */
    char *host;					/* Host machine of database. */
    char *user;					/* Database user name */
    char *password;				/* Password. */
    struct sqlProfile *profile;                 /* restrict to this profile */
    /* contents of cache */
    int entryCnt;                               /* # open connections. */
    struct sqlConnCacheEntry *entries;          /* entries in the cache */
};

struct sqlConnCacheEntry
/* an entry in the cache */
{
    struct sqlConnCacheEntry *next;
    struct sqlProfile *profile;      /* profile for connection, can be NULL if host is explicit */
    struct sqlConnection *conn;      /* connection */
    boolean inUse;                   /* is this in use? */
};

struct sqlConnCache *sqlConnCacheNewRemote(char *host, char *user,
                                           char *password)
/* Set up a cache on a remote database. */
{
struct sqlConnCache *cache;
AllocVar(cache);
cache->host = cloneString(host);
cache->user = cloneString(user);
cache->password = cloneString(password);
return cache;
}

struct sqlConnCache *sqlConnCacheNew()
/* Return a new connection cache. */
{
struct sqlConnCache *cache;
AllocVar(cache);
return cache;
}

struct sqlConnCache *sqlConnCacheNewProfile(char *profileName)
/* Return a new connection cache associated with the particular profile. */
{
struct sqlConnCache *cache = sqlConnCacheNew();
cache->profile = sqlProfileMustGet(profileName, NULL);
return cache;
}

void sqlConnCacheFree(struct sqlConnCache **pCache)
/* Dispose of a connection cache. */
{
struct sqlConnCache *cache;
if ((cache = *pCache) != NULL)
    {
    struct sqlConnCacheEntry *scce;
    for (scce = cache->entries; scce != NULL; scce = scce->next)
        {
        scce->conn->inCache = FALSE;
        scce->conn->isFree = FALSE;
	sqlDisconnect(&scce->conn);
        }
    slFreeList(&cache->entries);
    freeMem(cache->host);
    freeMem(cache->user);
    freeMem(cache->password);
    freez(pCache);
    }
}

static struct sqlConnCacheEntry *sqlConnCacheAdd(struct sqlConnCache *cache,
                                                 struct sqlProfile *profile,
                                                 struct sqlConnection *conn)
/* create and add a new cache entry */
{
struct sqlConnCacheEntry *scce;
AllocVar(scce);
scce->profile = profile;
scce->conn = conn;
conn->inCache = TRUE;
conn->isFree = TRUE;
slAddHead(&cache->entries, scce);
cache->entryCnt++;
return scce;
}

static boolean sqlConnCacheEntryDbMatch(struct sqlConnCacheEntry *scce,
                                        char *database)
/* does a database match the one in the connection cache? */
{
return (sameOk(database, sqlGetDatabase(scce->conn)));
}

static int sqlConnChangeDb(struct sqlConnection *sc, char *database, bool mustConnect)
/* change the db variable of an sqlConnection, try to change the mysql db and
 * return the result code. */
{
// update the db variable
monitorPrint(sc, "SQL_SET_DB", "%s", database);
freeMem(sc->db);
sc->db = cloneString(database);

if (mustConnect)
    {
    if (!sqlConnectIfUnconnected(sc, FALSE))
        {
        monitorPrint(sc, "SQL_SET_DB_FAILED", "%s", database);
        return -1;
        }
    }

// change the db
int resCode = 0;
if (sc->conn)
    {
    resCode = mysql_select_db(sc->conn, database);
    if (resCode!=0)
        monitorPrint(sc, "SQL_SET_DB_ERROR", "%d", resCode);
    }

sc->hasTableCache = -1; // -1 = undefined
return resCode;
}

static boolean sqlConnChangeDbFailover(struct sqlConnection *sc, char *database, boolean abort)
/* only fail if both main and failover cannot connect */
/* This allows to have databases that exist only on one of both servers */

{
int mainConnErr   = sqlConnChangeDb(sc, database, TRUE);
int foConnErr     = sqlConnChangeDb(sc->failoverConn, database, sqlConnMustUseFailover(sc));

if (mainConnErr!=0 && foConnErr!=0)
    {
    if (abort)
        {
        struct sqlConnection *errSc;
        if (foConnErr!=0)
            errSc = sc->failoverConn;
        else
            errSc = sc;
        sqlAbort(sc, "Couldn't set connection database to %s\n%s", database, mysql_error(errSc->conn));
        }
    return FALSE;
    }

return TRUE;
}

static boolean sqlConnChangeDbMain(struct sqlConnection *sc, char *database, boolean abort)
/* change the database of an sql connection */
{
int connErr = sqlConnChangeDb(sc, database, abort);
if (connErr != 0)
    {
    if (abort) 
        sqlAbort(sc, "Couldn't set connection database to %s", database);
    return FALSE;
    }
return TRUE;
}

static boolean sqlConnChangeDbMainOrFailover(struct sqlConnection *sc, char *database, boolean abort)
/* change the database of an sql connection, using failover if applicable */
{
if (sc->failoverConn == NULL)
    return sqlConnChangeDbMain(sc, database, abort);
else
    return sqlConnChangeDbFailover(sc, database, abort);
}

static boolean sqlConnCacheEntrySetDb(struct sqlConnCacheEntry *scce,
                                      char *database,
                                      boolean abort)
/* set the connect cache and connect to the specified database */
{
struct sqlConnection *sc = scce->conn;

return sqlConnChangeDbMainOrFailover(sc, database, abort);
}

static struct sqlConnCacheEntry *sqlConnCacheFindFree(struct sqlConnCache *cache,
                                                      struct sqlProfile *profile,
                                                      char *database,
                                                      boolean matchDatabase)
/* find a free entry associated with profile and database. Return NULL if no
 * entries are available.  Will attempt to match database if requested, this
 * includes connections to no database (database==NULL). */
{
struct sqlConnCacheEntry *scce;
for (scce = cache->entries; scce != NULL; scce = scce->next)
    {
    if (!scce->inUse && (profile == scce->profile)
        && ((!matchDatabase) || sqlConnCacheEntryDbMatch(scce, database)))
        {
        return scce;
        }
    }
return NULL;
}

static struct sqlConnCacheEntry *sqlConnCacheAddNew(struct sqlConnCache *cache,
                                                    struct sqlProfile *profile,
                                                    char *database,
                                                    boolean abort)
/* create and add a new connect to the cache */
{
struct sqlConnection *conn;
if (cache->entryCnt >= sqlConnCacheMax)
    errAbort("Too many open sqlConnections for cache");
if (cache->host != NULL)
    {
    struct sqlProfile *clone = sqlProfileClone(profile);
    clone->host = cache->host;
    clone->port = 0;
    clone->socket = NULL;
    clone->user = cache->user;
    clone->password = cache->password;
    conn = sqlConnRemote(clone, database, abort);
    }
else
    {
    conn = sqlConnProfile(profile, database, abort);
    }
if (conn != NULL)
    return sqlConnCacheAdd(cache, profile, conn);
else
    {
    assert(!abort);
    return NULL;
    }
}

static struct sqlConnection *sqlConnCacheDoAlloc(struct sqlConnCache *cache,
                                                 char *profileName,
                                                 char *database,
                                                 boolean abort)
/* Allocate a cached connection. errAbort if too many open connections.
 * errAbort if abort and connection fails. */
{
// obtain profile
struct sqlProfile *profile = NULL;
if ((cache->host != NULL) && (profileName != NULL))
    errAbort("can't specify profileName (%s) when sqlConnCache is create with a specific host (%s)",
             profileName, cache->host);
if ((profileName != NULL) && (cache->profile != NULL)
    && !sameString(profileName, cache->profile->name))
    errAbort("profile name %s doesn't match profile associated with sqlConnCache %s",
             profileName, cache->profile->name);
if (cache->profile != NULL)
    profile = cache->profile;
else
    profile = sqlProfileMustGet(profileName, database);

// try getting an entry, first trying to find one for this database, then
// look for any database, then add a new one
struct sqlConnCacheEntry *scce
    = sqlConnCacheFindFree(cache, profile, database, TRUE);
if (scce == NULL)
    {
    scce = sqlConnCacheFindFree(cache, profile, database, FALSE);
    if (scce != NULL)
        {
        if (!sqlConnCacheEntrySetDb(scce, database, abort))
            scce = NULL;  // got error with no abort
        }
    else
        scce = sqlConnCacheAddNew(cache, profile, database, abort);
    }
if (scce != NULL)
    {
    assert(scce->conn->isFree);
    scce->inUse = TRUE;
    scce->conn->isFree = FALSE;
    return scce->conn;
    }
else
    return NULL;
}

struct sqlConnection *sqlConnCacheMayAlloc(struct sqlConnCache *cache,
                                           char *database)
/* Allocate a cached connection. errAbort if too many open connections,
 * return NULL if can't connect to server. */
{
return sqlConnCacheDoAlloc(cache, NULL, database, FALSE);
}

struct sqlConnection *sqlConnCacheAlloc(struct sqlConnCache *cache,
                                        char *database)
/* Allocate a cached connection. */
{
return sqlConnCacheDoAlloc(cache, NULL, database, TRUE);
}

struct sqlConnection *sqlConnCacheProfileAlloc(struct sqlConnCache *cache,
                                               char *profileName,
                                               char *database)
/* Allocate a cached connection given a profile and/or database. */
{
return sqlConnCacheDoAlloc(cache, profileName, database, TRUE);
}

struct sqlConnection *sqlConnCacheProfileAllocMaybe(struct sqlConnCache *cache,
                                                    char *profileName,
                                                    char *database)
/* Allocate a cached connection given a profile and/or database. Return NULL
 * if the database doesn't exist.  */
{
return sqlConnCacheDoAlloc(cache, profileName, database, FALSE);
}

void sqlConnCacheDealloc(struct sqlConnCache *cache, struct sqlConnection **pConn)
/* Free up a cached connection. */
{
struct sqlConnection *conn = *pConn;
if (conn != NULL)
    {
    if (!conn->inCache)
        errAbort("sqlConnCacheDealloc called on connection that is not associated with a cache");
    assert(!conn->isFree);
    conn->isFree = TRUE;
    struct sqlConnCacheEntry *scce;
    for (scce = cache->entries; (scce != NULL) && (scce->conn != conn); scce = scce->next)
        continue;
    if (scce ==  NULL)
        errAbort("sqlConnCacheDealloc called on cache that doesn't contain "
                 "the given connection");
    scce->inUse = FALSE;
    *pConn = NULL;
    }
}

unsigned long sqlEscapeStringFull(char *to, const char* from, long fromLength)
/* Prepares a string for inclusion in a sql statement.  Output string
 * must be 2*strlen(from)+1. fromLength is the length of the from data.
 * Specifying fromLength allows one to encode a binary string that can contain any character including 0. */
{
return mysql_escape_string(to, from, fromLength);
}

// where am I using this? probably just cart.c and maybe cartDb.c ?
// but it is worth keeping just for the cart.
void sqlDyAppendEscaped(struct dyString *dy, char *s)
/* Append to dy an escaped s */
{
dyStringBumpBufSize(dy, dy->stringSize + strlen(s)*2);
int realSize = sqlEscapeString3(dy->string+dy->stringSize, s);
dy->stringSize += realSize;
}

unsigned long sqlEscapeString3(char *to, const char* from)
/* Prepares a string for inclusion in a sql statement.  Output string
 * must be 2*strlen(from)+1.  Returns actual escaped size not counting term 0. */
{
return sqlEscapeStringFull(to, from, strlen(from));
}

char *sqlEscapeString2(char *to, const char* from)
/* Prepares a string for inclusion in a sql statement.  Output string
 * must be 2*strlen(from)+1 */
{
sqlEscapeStringFull(to, from, strlen(from));
return to;
}

char *sqlEscapeString(const char* from)
/* Prepares string for inclusion in a SQL statement . Remember to free
 * returned string.  Returned string contains strlen(length)*2+1 as many bytes
 * as orig because in worst case every character has to be escaped.*/
{
int size = (strlen(from)*2) +1;
char *to = needMem(size * sizeof(char));
return sqlEscapeString2(to, from);
}

char *sqlEscapeTabFileString2(char *to, const char *from)
/* Escape a string for including in a tab seperated file. Output string
 * must be 2*strlen(from)+1 */
{
const char *fp = from;
char *tp = to;
while (*fp != '\0')
    {
    switch (*fp)
        {
        case '\\':
            *tp++ = '\\';
            *tp++ = '\\';
            break;
        case '\n':
            *tp++ = '\\';
            *tp++ = 'n';
            break;
        case '\t':
            *tp++ = '\\';
            *tp++ = 't';
            break;
        default:
            *tp++ = *fp;
            break;
        }
    fp++;
    }
*tp = '\0';
return to;
}

char *sqlEscapeTabFileString(const char *from)
/* Escape a string for including in a tab seperated file. Freez or freeMem
 * result when done. */
{
int size = (strlen(from)*2) +1;
char *to = needMem(size * sizeof(char));
return sqlEscapeTabFileString2(to, from);
}

static void addProfileDatabases(char *profileName, struct hash *databases)
/* find databases on a profile and add to hash */
{
struct sqlConnection *sc = sqlMayConnectProfile(profileName, NULL);
if (sc != NULL)
    {
    struct slName *db, *dbs = sqlGetAllDatabase(sc);
    for (db = dbs; db != NULL; db = db->next)
        hashAdd(databases, db->name, NULL);
    sqlDisconnect(&sc);
    slFreeList(&dbs);
    }
}

struct hash *sqlHashOfDatabases(void)
/* Get hash table with names of all databases that are online. */
{
if (profiles == NULL)
    sqlProfileLoad();
struct hash *databases = newHash(8);
// add databases found using default profile
addProfileDatabases(getDefaultProfileName(), databases);

// add databases found in failover profile
char *failoverProfName = catTwoStrings(failoverProfPrefix, getDefaultProfileName());
addProfileDatabases(failoverProfName, databases);
freez(&failoverProfName);

// add other databases explicitly associated with other profiles
struct hashCookie cookie = hashFirst(dbToProfile);
struct hashEl *hel;
while ((hel = hashNext(&cookie)) != NULL)
    {
    char *db = ((struct sqlProfile*)hel->val)->name;
    hashAdd(databases, db, NULL);
    }
return databases;
}

struct slName *sqlListOfDatabases(void)
/* Get list of all databases that are online. */
{
/* build hash and convert to names list to avoid duplicates due to visiting
 * multiple profiles to the same server */
struct hash *dbHash = sqlHashOfDatabases();
struct hashCookie cookie = hashFirst(dbHash);
struct hashEl *hel;
struct slName *dbs = NULL;
while ((hel = hashNext(&cookie)) != NULL)
    slSafeAddHead(&dbs, slNameNew(hel->name));
hashFree(&dbHash);
slSort(&dbs, slNameCmp);
return dbs;
}

boolean sqlWildcardIn(char *s)
/* Return TRUE if there is a sql wildcard char in string. */
{
char c;
while ((c = *s++) != 0)
    {
    if (c == '_' || c == '%')
        return TRUE;
    }
return FALSE;
}

char *sqlLikeFromWild(char *wild)
/* Convert normal wildcard string to SQL wildcard by
 * mapping * to % and ? to _.  Escape any existing % and _'s. */
{
int escCount = countChars(wild, '%') + countChars(wild, '_');
int size = strlen(wild) + escCount + 1;
char *retVal = needMem(size);
char *s = retVal, c;

while ((c = *wild++) != 0)
    {
    switch (c)
	{
	case '%':
	case '_':
	    *s++ = '\\';
	    *s++ = c;
	    break;
	case '*':
	    *s++ = '%';
	    break;
	case '?':
	    *s++ = '_';
	    break;
	default:
	    *s++ = c;
	    break;
	}
    }
return retVal;
}

long sqlDateToUnixTime(char *sqlDate)
/* Convert a SQL date such as "2003-12-09 11:18:43" to clock time
 * (seconds since midnight 1/1/1970 in UNIX). */
{
struct tm *tm = NULL;
long clockTime = 0;

if (sqlDate == NULL)
    errAbort("Null string passed to sqlDateToUnixTime()");
AllocVar(tm);
if (sscanf(sqlDate, "%4d-%2d-%2d %2d:%2d:%2d",
	   &(tm->tm_year), &(tm->tm_mon), &(tm->tm_mday),
	   &(tm->tm_hour), &(tm->tm_min), &(tm->tm_sec))  != 6)
    errAbort("Couldn't parse sql date \"%s\"", sqlDate);
tm->tm_year -= 1900;
tm->tm_mon  -= 1;
/* Ask mktime to determine whether Daylight Savings Time is in effect for
 * the given time: */
tm->tm_isdst = -1;
clockTime = mktime(tm);
if (clockTime < 0)
    errAbort("mktime failed (%d-%d-%d %d:%d:%d).",
	     tm->tm_year, tm->tm_mon, tm->tm_mday,
	     tm->tm_hour, tm->tm_min, tm->tm_sec);
freez(&tm);
return clockTime;
}

char *sqlUnixTimeToDate(time_t *timep, boolean gmTime)
/* Convert a clock time (seconds since 1970-01-01 00:00:00 unix epoch)
 *	to the string: "YYYY-MM-DD HH:MM:SS"
 *  returned string is malloced, can be freed after use
 *  boolean gmTime requests GMT time instead of local time
 */
{
struct tm *tm;
char *ret;

if (gmTime)
    tm = gmtime(timep);
else
    tm = localtime(timep);

ret = (char *)needMem(25*sizeof(char));  /* 25 is good for a billion years */

safef(ret, 25*sizeof(char), "%d-%02d-%02d %02d:%02d:%02d",
    1900+tm->tm_year, 1+tm->tm_mon, tm->tm_mday,
    tm->tm_hour, tm->tm_min, tm->tm_sec);
return(ret);
}

static int getUpdateFieldIndex(struct sqlResult *sr)
/* Return index of update field. 
 * Note: does NOT work on innoDB! */
{
static int updateFieldIndex = -1;
if (updateFieldIndex < 0)
    {
    int ix;
    char *name;
    for (ix=0; ;++ix)
        {
	name = sqlFieldName(sr);
	if (name == NULL)
	    errAbort("Can't find Update_time field in show table status result");
	if (sameString("Update_time", name))
	    {
	    updateFieldIndex = ix;
	    break;
	    }
	}
    }
return updateFieldIndex;
}

char *sqlTableUpdate(struct sqlConnection *conn, char *table)
/* Get last update time for table as an SQL string 
 * Note: does NOT work on innoDB! */
{
char query[512], **row;
struct sqlResult *sr;
int updateIx;
char *ret;
// "show table status" does not support db.table names, so temporarily change database
// if table is really db.table.
char *connDb = cloneString(sqlGetDatabase(conn));
char tableDb[1024];
char tableName[1024];
sqlParseDbDotTable(connDb, table, tableDb, sizeof tableDb, tableName, sizeof tableName);
boolean changeDb = differentStringNullOk(connDb, tableDb);
sqlSafef(query, sizeof(query), "show table status like '%s'", tableName);
// the failover strategy for failoverConn does not work for this command, 
// as it never returns an error. So we run this on the failover server
// if we have a failover connection and the table is not on the main server
boolean useFailOver = conn->failoverConn && !sqlTableExistsOnMain(conn, tableName);
if (useFailOver)
    {
    sqlConnectIfUnconnected(conn->failoverConn, TRUE);
    monitorPrintInfo(conn->failoverConn, "SQL_TABLE_STATUS_FAILOVER");
    if (changeDb)
        sqlConnChangeDb(conn->failoverConn, tableDb, TRUE);
    sr = sqlGetResult(conn->failoverConn, query);
    }
else
    {
    if (changeDb)
        sqlConnChangeDb(conn, tableDb, TRUE);
    sr = sqlGetResult(conn, query);
    }
updateIx = getUpdateFieldIndex(sr);
row = sqlNextRow(sr);
if (row == NULL)
    errAbort("Database table %s doesn't exist", table);
ret = cloneString(row[updateIx]);
sqlFreeResult(&sr);
if (changeDb)
    {
    if (useFailOver)
        sqlConnChangeDb(conn->failoverConn, connDb, TRUE);
    else
        sqlConnChangeDb(conn, connDb, TRUE);
    }
freeMem(connDb);
return ret;
}

time_t sqlTableUpdateTime(struct sqlConnection *conn, char *table)
/* Get last update time for table.
 * Note: does NOT work on innoDB! */
{
char *date = sqlTableUpdate(conn, table);
time_t time = sqlDateToUnixTime(date);
freeMem(date);
return time;
}

static char *sqlTablePropertyFromSchema(struct sqlConnection *conn, char *db, char *table, char *field)
/* Get table property. Table must exist or will abort. */
{
char query[512], **row;
struct sqlResult *sr;
char *ret;
char tableDb[1024];
char tableName[1024];
sqlParseDbDotTable(db, table, tableDb, sizeof tableDb, tableName, sizeof tableName);
sqlSafef(query, sizeof(query), 
    "SELECT %s FROM information_schema.TABLES"
    " WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s'", field, tableDb, tableName);
// the failover strategy for failoverConn does not work for this command, 
// as it never returns an error. So we run this on the failover server
// if we have a failover connection and the table is not on the main server
if (conn->failoverConn && !sqlTableExistsOnMain(conn, tableName))
    {
    sqlConnectIfUnconnected(conn->failoverConn, TRUE);
    monitorPrintInfo(conn->failoverConn, "SQL_TABLE_STATUS_FAILOVER");
    sr = sqlGetResult(conn->failoverConn, query);
    }
else
    sr = sqlGetResult(conn, query);
row = sqlNextRow(sr);
if (row == NULL)
    errAbort("Database table %s or field %s doesn't exist", table, field);
ret = cloneString(row[0]);
sqlFreeResult(&sr);
return ret;
}

unsigned long sqlTableDataSizeFromSchema(struct sqlConnection *conn, char *db, char *table)
/* Get table data size. Table must exist or will abort. */
{
char *sizeString = sqlTablePropertyFromSchema(conn, db, table, "data_length");
return sqlUnsignedLong(sizeString);
}

unsigned long sqlTableIndexSizeFromSchema(struct sqlConnection *conn, char *db, char *table)
/* Get table index size. Table must exist or will abort. */
{
char *sizeString = sqlTablePropertyFromSchema(conn, db, table, "index_length");
return sqlUnsignedLong(sizeString);
}


char *sqlGetPrimaryKey(struct sqlConnection *conn, char *table)
/* Get primary key if any for table, return NULL if none. */
{
struct sqlResult *sr;
char **row;
char *key = NULL;
sr = sqlDescribe(conn, table);
while ((row = sqlNextRow(sr)) != NULL)
    {
    if (sameWord(row[3], "PRI"))
	{
        key = cloneString(row[0]);
	break;
	}
    }
sqlFreeResult(&sr);
return key;
}

char *sqlVersion(struct sqlConnection *conn)
/* Return version of MySQL database.  This will be something
 * of the form 5.0.18-standard. */
{
char query[64];
char **row;
sqlSafef(query, sizeof query, "show variables like 'version'");
struct sqlResult *sr = sqlGetResult(conn, query);
char *version = NULL;
if ((row = sqlNextRow(sr)) != NULL)
    version = cloneString(row[1]);
else
    errAbort("No mySQL version var.");
sqlFreeResult(&sr);
return version;
}

int sqlMajorVersion(struct sqlConnection *conn)
/* Return major version of database. */
{
char *s = sqlVersion(conn);
int ver;
if (!isdigit(s[0]))
    errAbort("Unexpected format in version: %s", s);
ver = atoi(s);		/* NOT sqlUnsigned please! */
freeMem(s);
return ver;
}

int sqlMinorVersion(struct sqlConnection *conn)
/* Return minor version of database. */
{
char *s = sqlVersion(conn);
char *words[5];
int ver;

chopString(s, ".", words, ArraySize(words));

if (!isdigit(*words[1]))
    errAbort("Unexpected format in version: %s", s);
ver = atoi(words[1]);           /* NOT sqlUnsigned please! */
freeMem(s);
return ver;
}

char** sqlGetEnumDef(struct sqlConnection *conn, char* table, char* colName)
/* Get the definitions of a enum column in a table, returning a
 * null-terminated array of enum values.  Free array when finished.  */
{
static char *enumPrefix = "enum(";
struct sqlResult *sr;
char **row;
char *defStr, *defStrCp;
int numValues, i;
char **enumDef;

/* get enum definition */
sr = sqlDescribe(conn, table);
while (((row = sqlNextRow(sr)) != NULL) && !sameString(row[0], colName))
    continue;
if (row == NULL)
    errAbort("can't find column %s in DESCRIBE of %s", colName, table);

/* parse definition in the form:
 * enum('unpicked','candidate',... ,'cantSequence') */
if (!startsWith(enumPrefix, row[1]))
    errAbort("%s column %s isn't an enum: %s", table, colName, row[1]);
defStr = row[1] + strlen(enumPrefix);

/* build char** array with string space in same block */
numValues = chopString(defStr, ",", NULL, 0);
enumDef = needMem(((numValues+1) * sizeof (char**)) + strlen(defStr)+1);
defStrCp = ((char*)enumDef) + ((numValues+1) * sizeof (char**));
strcpy(defStrCp, defStr);
chopString(defStrCp, ",", enumDef, numValues);

/* remove quotes */
for (i = 0; enumDef[i] != NULL; i++)
    {
    int len = strlen(enumDef[i]);
    if (enumDef[i+1] == NULL)
        len--;  /* last entry hash close paren */
    if ((enumDef[i][0] != '\'') || (enumDef[i][len-1] != '\''))
        errAbort("can't find quotes in %s column %s enum value: %s",
                 table, colName, enumDef[i]);
    enumDef[i][len-1] = '\0';
    enumDef[i]++;
    }

sqlFreeResult(&sr);
return enumDef;
}

struct slName *sqlRandomSampleWithSeedConn(struct sqlConnection *conn, char *table, char *field, int count, int seed)
/* Get random sample from database specifiying rand number seed, or -1 for none */
{
char query[256], **row;
struct sqlResult *sr;
struct slName *list = NULL, *el;
char seedString[256] = "";
/* The randomized-order, distinct-ing query can take a very long time on
 * very large tables.  So create a smaller temporary table and use that.
 * The temporary table is visible only to the current connection, so
 * doesn't have to be very uniquely named, and will disappear when the
 * connection is closed. */
/* check if table has 'db.' prefix in it */
char *plainTable = strrchr(table, '.');
if (plainTable)
    plainTable++;
else
    plainTable = table;
sqlSafef(query, sizeof(query),
      "create temporary table hgTemp.tmp%s select %s from %s limit 100000",
      plainTable, field, table);
sqlUpdate(conn, query);
if (seed != -1)
    safef(seedString,sizeof(seedString),"%d",seed);
sqlSafef(query, sizeof(query), "select distinct %s from hgTemp.tmp%s "
      "order by rand(%s) limit %d",
      field, plainTable, seedString, count);
sr = sqlGetResult(conn, query);
while ((row = sqlNextRow(sr)) != NULL)
    {
    el = slNameNew(row[0]);
    slAddHead(&list, el);
    }
sqlFreeResult(&sr);
return list;
}

struct slName *sqlRandomSampleWithSeed(char *db, char *table, char *field, int count, int seed)
/* Get random sample from database specifiying rand number seed, or -1 for none */
{
struct sqlConnection *conn = sqlConnect(db);
return sqlRandomSampleWithSeedConn(conn, table, field, count, seed);
sqlDisconnect(&conn);
}

struct slName *sqlRandomSampleConn(struct sqlConnection *conn, char *table,
				   char *field, int count)
/* Get random sample from conn. */
{
return sqlRandomSampleWithSeedConn(conn, table, field, count, -1);
}

struct slName *sqlRandomSample(char *db, char *table, char *field, int count)
/* Get random sample from database. */
{
return sqlRandomSampleWithSeed(db, table, field, count, -1);
}


bool sqlCanCreateTemp(struct sqlConnection *conn)
/* Return True if it looks like we can write temp tables into the current database
 * Can be used to check if sqlRandomSampleWithSeed-functions are safe to call.
 * */
{
// assume we can write if the current connection has no failOver connection 
if (conn->failoverConn==NULL)
    {
    return TRUE;
    }

char *err;
unsigned int errNo;

// try a create temp query
char *query = "CREATE TEMPORARY TABLE testTemp (number INT); DROP TABLE testTemp;";
struct sqlResult *sr = sqlGetResultExt(conn, query, &errNo, &err);
if (sr==NULL)
    {
    return FALSE;
    }

sqlFreeResult(&sr);
return TRUE;
}

static struct sqlFieldInfo *sqlFieldInfoParse(char **row)
/* parse a row into a sqlFieldInfo object */
{
struct sqlFieldInfo *fi;
AllocVar(fi);
fi->field = cloneString(row[0]);
fi->type = cloneString(row[1]);
fi->allowsNull = sameString(row[2], "YES");
fi->key = cloneString(row[3]);
fi->defaultVal = cloneString(row[4]);
fi->extra = cloneString(row[5]);
return fi;
}

struct sqlFieldInfo *sqlFieldInfoGet(struct sqlConnection *conn, char *table)
/* get a list of objects describing the fields of a table */
{
char query[512];
struct sqlResult *sr;
char **row;
struct sqlFieldInfo *fiList = NULL;

sqlSafef(query, sizeof(query), "SHOW COLUMNS FROM %s", table);
sr = sqlGetResult(conn, query);
while ((row = sqlNextRow(sr)) != NULL)
    slSafeAddHead(&fiList, sqlFieldInfoParse(row));
sqlFreeResult(&sr);
slReverse(&fiList);
return fiList;
}

static void sqlFieldInfoFree(struct sqlFieldInfo **fiPtr)
/* Free a sqlFieldInfo object */
{
struct sqlFieldInfo *fi = *fiPtr;
if (fi != NULL)
    {
    freeMem(fi->field);
    freeMem(fi->type);
    freeMem(fi->key);
    freeMem(fi->defaultVal);
    freeMem(fi->extra);
    freeMem(fi);
    *fiPtr = NULL;
    }
}

void sqlFieldInfoFreeList(struct sqlFieldInfo **fiListPtr)
/* Free a list of sqlFieldInfo objects */
{
struct sqlFieldInfo *fi;
while ((fi = slPopHead(fiListPtr)) != NULL)
       sqlFieldInfoFree(&fi);
}

void *sqlVaQueryObjs(struct sqlConnection *conn, sqlLoadFunc loadFunc,
                     unsigned opts, char *queryFmt, va_list args)
/* Generate a query from format and args.  Load one or more objects from rows
 * using loadFunc.  Check the number of rows returned against the sqlQueryOpts
 * bit set.  Designed for use with autoSql, although load function must be
 * cast to sqlLoadFunc. */
{
char query[1024];
struct slList *objs = NULL;
struct sqlResult *sr;
char **row;

vaSqlSafef(query, sizeof(query), queryFmt, args);
sr = sqlGetResult(conn, query);
while ((row = sqlNextRow(sr)) != NULL) {
slSafeAddHead(&objs, loadFunc(row));
}
sqlFreeResult(&sr);
slReverse(&objs);

/* check what we got against the options */
if (objs == NULL)
    {
    if (opts & sqlQueryMust)
        errAbort("no results return from query: %s", query);
    }
else if ((opts & sqlQuerySingle) && (objs->next != NULL))
    errAbort("one results, got %d, from query: %s", slCount(objs), query);
return objs;
}

/* Generate a query from format and args.  Load one or more objects from rows
 * using loadFunc.  Check the number of rows returned against the sqlQueryOpts
 * bit set. */
void *sqlQueryObjs(struct sqlConnection *conn, sqlLoadFunc loadFunc,
                   unsigned opts, char *queryFmt, ...)
/* Generate a query from format and args.  Load one or more objects from rows
 * using loadFunc.  Check the number of rows returned against the sqlQueryOpts
 * bit set.  Designed for use with autoSql, although load function must be
 * cast to sqlLoadFunc. */
{
struct slList *objs = NULL;
va_list args;
va_start(args, queryFmt);
objs = sqlVaQueryObjs(conn, loadFunc, opts, queryFmt, args);
va_end(args);
return objs;
}

int sqlSaveQuery(struct sqlConnection *conn, char *query, char *outPath, boolean isFa)
/* Execute query, save the resultset as a tab-separated file.
 * If isFa is true, than assume it is a two column fasta query and format accordingly.
 * Return count of rows in result set.  Abort on error. */
{
struct sqlResult *sr;
char **row;
char *sep="";
int c = 0;
int count = 0;
int numCols = 0;
FILE *f = mustOpen(outPath,"w");
sr = sqlGetResult(conn, query);
numCols = sqlCountColumns(sr);
while ((row = sqlNextRow(sr)) != NULL)
    {
    sep="";
    if (isFa)
	sep = ">";
    for (c=0;c<numCols;++c)
	{
	fprintf(f,"%s%s",sep,row[c]);
	sep = "\t";
	if (isFa)
	    sep = "\n";
	}
    fprintf(f,"\n");
    ++count;
    }
sqlFreeResult(&sr);
carefulClose(&f);
return count;
}

char *sqlTempTableName(struct sqlConnection *conn, char *prefix)
/* Return a name for a temporary table. Name will start with
 * prefix.  This call doesn't actually  make table.  (So you should
 * make table before next call to insure uniqueness.)  However the
 * table name encorperates the host, pid, and time, which helps insure
 * uniqueness between different processes at least.  FreeMem the result
 * when you are done. */
{
int i;
char tableName[PATH_LEN];
for (i=0; ;i++)
    {
    char *x = semiUniqName(prefix);
    safef(tableName, sizeof(tableName), "%s%d", x, i);
    if (!sqlTableExists(conn, tableName))
        break;
    }
return cloneString(tableName);
}

void sqlSetParanoid(boolean beParanoid)
/* If set to TRUE, will make more diagnostic stderr messages. */
{
sqlParanoid = beParanoid;
}

boolean sqlIsRemote(struct sqlConnection *conn)
/* test if the conn appears to be to a remote system.
 * Current only tests for a TCP/IP connection */
{
return (conn->conn->unix_socket == NULL);
}

static void sqlDumpProfile(struct sqlProfile *sp, FILE *fh)
/* dump one db profile */
{
fprintf(fh, "profile: %s host: %s user: %s dbs:", sp->name, sp->host, sp->user);
struct slName *db;
for (db = sp->dbs; db != NULL; db = db->next)
    fprintf(fh, " %s", db->name);
fputc('\n', fh);
}

static void sqlDumpConnection(struct sqlConnection *conn, FILE *fh)
/* dump an sql connection for debugging */
{
fprintf(fh, "conn: profile: %s host: %s db: %s results: %d",
        conn->profile->name, conn->conn->host, conn->conn->db, dlCount(conn->resultList));
if (conn->hasHardLock)
    fputs(" hardLocked", fh);
if (conn->inCache)
    fputs(" cached", fh);
if (conn->isFree)
    fputs(" free", fh);
fputc('\n', fh);
}

void sqlDump(FILE *fh)
/* dump internal info about SQL configuration for debugging purposes */
{
static char *dashes = "--------------------------------------------------------";
fprintf(fh, "%s\n", dashes);
fprintf(fh, "defaultProfile=%s\n", (defaultProfile != NULL) ? defaultProfile->name : "NULL");
struct hashCookie cookie = hashFirst(profiles);
struct hashEl *hel;
while((hel = hashNext(&cookie)) != NULL)
    sqlDumpProfile(hel->val, fh);

cookie = hashFirst(dbToProfile);
while((hel = hashNext(&cookie)) != NULL)
    fprintf(fh, "db: %s profile: %s\n", hel->name, ((struct sqlProfile*)hel->val)->name);

struct dlNode *connNode;
for (connNode = sqlOpenConnections->head; !dlEnd(connNode); connNode = connNode->next)
    sqlDumpConnection(connNode->val, fh);
fprintf(fh, "%s\n", dashes);
}

void sqlPrintStats(FILE *fh)
/* print statistic about the number of connections and other options done by
 * this process. */
{
fprintf(fh, "sqlStats: connects: %d maxOpen: %d\n", totalNumConnects, maxNumConnections);
}
    
/* --------- input checks to prevent sql injection --------------------------------------- */

// 0 means char is allowed
// 1 means char is disallowed

// although the mysql escape function can escape binary data,
// we don't need to support escaping 0 for strings here.

static boolean sqlCheckAllowedChars(char *s, char disAllowed[256])
/* Check each character of input against allowed character set */
{
if (!s)
    {
    sqlCheckError("sqlCheckAllowedChars - Cannot check NULL");
    return FALSE;
    }
char *sOriginal = s;
unsigned char c;
while((c = *s++) != 0)
    {
    if (disAllowed[c])
	{
	// just using this as a work-around
	// until the problem with early errors and warn/abort stacks has been fixed.
	char *noSqlInjLevel = cfgOptionDefault("noSqlInj.level", "abort");
	if (!sameString(noSqlInjLevel, "ignore"))
	    {
    	    fprintf(stderr, "character %c disallowed in sql string part %s\n", c, sOriginal);  
	    fflush(stderr);
	    }

	return FALSE;
	}
    }
return TRUE;
}

static void sqlCheckDisallowAllChars(char disAllowed[256])
/* Disallow all chars by setting to 1 */
{
int i;
for(i=0;i<256;++i)
    disAllowed[i] = 1;
}

static void sqlCheckAllowLowerChars(char allowed[256])
/* Allow lower case chars by setting to 0 */
{
unsigned char c;
for(c='a';c<='z';++c)
    allowed[c] = 0;
}

static void sqlCheckAllowUpperChars(char allowed[256])
/* Allow upper case chars by setting to 0 */
{
unsigned char c;
for(c='A';c<='Z';++c)
    allowed[c] = 0;
}

static void sqlCheckAllowDigitChars(char allowed[256])
/* Allow digit chars by setting to 0 */
{
unsigned char c;
for(c='0';c<='9';++c)
    allowed[c] = 0;
}

static void sqlCheckAllowChar(unsigned char c, char allowed[256])
/* Allow a char by setting to 0 */
{
allowed[c] = 0;
}

static void sqlCheckAllowAlphaChars(char allowed[256])
/* Allow all chars by setting to 0 */
{
sqlCheckAllowUpperChars(allowed);
sqlCheckAllowLowerChars(allowed);
}

static void sqlCheckAllowAlphaNumChars(char allowed[256])
/* Allow all chars by setting to 0 */
{
sqlCheckAllowAlphaChars(allowed);
sqlCheckAllowDigitChars(allowed);
}

/* Currently used 10 times in the code via define sqlCkIl. */
char *sqlCheckIdentifiersListExt(char *identifiers)
/* Check that only valid identifier characters are used in a comma-separated list
 * '.' is allowed also since some code uses it in place of an actual field name.
 * See hgTables/bedList.c::bedSqlFieldsExceptForChrom(). */
{
static boolean init = FALSE;
static char allowed[256];
if (!init)
    {
    sqlCheckDisallowAllChars(allowed);
    sqlCheckAllowAlphaNumChars(allowed);
    sqlCheckAllowChar('.', allowed);
    sqlCheckAllowChar('_', allowed);
    // sqlTableExists looks like a single table check, but apparently it has become abused
    // to support multiple tables e.g. sqlTableExists 
    sqlCheckAllowChar(' ', allowed);
    sqlCheckAllowChar(',', allowed);
    sqlCheckAllowChar('\'', allowed); // single quote allowed for special case fieldname is '.'
    // NOTE it is important for security that no other characters be allowed here
    init = TRUE;
    }
if (sameString(identifiers, "*"))  // exception allowed
    return identifiers;
if (!sqlCheckAllowedChars(identifiers, allowed))
    {
    sqlCheckError("Illegal character found in identifier list %s", identifiers);
    return identifiers;
    }
// Unfortunately, just checking that the characters are legal is far from enough to ensure safety.
// the comma is required. Currently aliases and tick quotes are not supported.
int len = strlen(identifiers);
char c = 0;
int i = 0;
boolean needText = TRUE;
boolean spaceOk = FALSE;
boolean textDone = FALSE;
// Currently identifiers list must start with an identifier, no leading spaces or comma allowed.
// Currently the comma must immediately follow the identifier
// Currently zero or one spaces may follow the comma
// List should end with an identifier. No trailing comma or space allowed.
// NOTE it is important for security that commas separate values.
// We do not want to support multiple words separated by spaces.
while (i < len)
    {
    c = identifiers[i];
    if (c == ' ')
	{
	if (!spaceOk)
	    {
	    sqlCheckError("Invalid Identifiers List [%s] unexpected space character", identifiers);
	    return identifiers;
	    }
	spaceOk = FALSE;
	}
    else if (c == ',')
	{
	if (needText)
	    {
	    sqlCheckError("Invalid Identifiers List [%s] unexpected comma character", identifiers);
	    return identifiers;
	    }
	spaceOk = TRUE;
	needText = TRUE;
	textDone = FALSE;
	}
    else // other chars are part of the identifier
	{
	if (textDone)
	    {
	    sqlCheckError("Invalid Identifiers List [%s] expected comma", identifiers);
	    return identifiers;
	    }
	needText = FALSE;
	spaceOk = FALSE;
	if (c == '\'') // check for '.' exception allowed
	    {
	    if (i+strlen("'.'") > len)
		{
		sqlCheckError("Invalid Identifiers List [%s] quoted-literal not supported", identifiers);
		return identifiers;
		}
	    if (identifiers[i+1] != '.') // next char must be a period
		{
		sqlCheckError("Invalid Identifiers List [%s] quoted-literal not supported", identifiers);
		return identifiers;
		}
	    if (identifiers[i+2] != '\'') // next char must be a single-quote
		{
		sqlCheckError("Invalid Identifiers List [%s] quoted-literal not supported", identifiers);
		return identifiers;
		}
	    i += 2;
	    textDone = TRUE;
	    }
	}
    
    ++i;	    
    }
if (needText || spaceOk)
    {
    sqlCheckError("Invalid Identifiers List [%s] unexpected trailing comma or space character", identifiers);
    return identifiers;
    }

return identifiers;
}

void sqlCheckIdentifiersList(char* buffer, int bufSize, char *identifiers)
/* Check that only valid identifier characters are used in a comma-separated list
 * '.' is allowed also since some code uses it in place of an actual field name.
 * See hgTables/bedList.c::bedSqlFieldsExceptForChrom().
 * Save safe output to char array */
{
sqlCheckIdentifiersListExt(identifiers);
safef(buffer, bufSize, NOSQLINJ "%s", identifiers);
}

char *sqlCheckIdentifier(char *identifier)
/* Check that only valid identifier characters are used */
{
static boolean init = FALSE;
static char allowed[256];
if (!init)
    {
    sqlCheckDisallowAllChars(allowed);
    sqlCheckAllowAlphaNumChars(allowed);
    sqlCheckAllowChar('.', allowed);
    sqlCheckAllowChar('_', allowed);
    // NOTE it is important for security that no other characters be allowed here
    init = TRUE;
    }
/* A good idea but code is currently using empty in table names at least. 
See src/hg/lib/gtexTissue.c:
select * from gtexTissue%s order by id
This could be re-worked someday, but not now. refs #22596
if (identifier[0] == 0) // empty string not allowed since this is usually caused by an error.
    {
    sqlCheckError("Illegal empty string identifier not allowed.");
    }
*/
if (!sqlCheckAllowedChars(identifier, allowed))
    {
    sqlCheckError("Illegal character found in identifier %s", identifier);
    }
return identifier;
}



/* --------------------------- */

int sqlEscapeAllStrings(char *buffer, char *s, int bufSize, char escPunc)
/* Escape all strings demarked by escPunc char. *
 * Returns final size not including terminating 0. 
 * User needs to pre-allocate enough space that mysql_escape will never run out of space.
 * This function should be efficient on statements with many strings to be escaped. */
{
char *sOrig = s;
int sz = 0;
int remainder = bufSize;
boolean done = FALSE;
while (1)
    {
    char *start = strchr(s, escPunc);
    char *end = NULL;
    if (start)
	{
    	end = strchr(start+1, escPunc); // skip over punc mark
	if (!end)
	    errAbort("Unexpected error in sqlEscapeAllStrings. s=[%s]", sOrig);
	}
    else
	{
	// just copy remainder of the input string to output
    	start = strchr(s, 0); // find end of string
	done = TRUE;	
	}
    // move any non-escaped part
    int moveSize = start - s;
    if (moveSize > remainder)
	errAbort("Buffer too small in sqlEscapeAllStrings. s=[%s] bufSize = %d", sOrig, bufSize);
    memmove(buffer, s, moveSize);
    buffer += moveSize;
    sz += moveSize;
    remainder -= moveSize;
    if (done)
	{
	if (remainder < 1)
	    errAbort("Buffer too small for termintating zero in sqlEscapeAllStrings. s=[%s] bufSize = %d", sOrig, bufSize);
	--remainder;
	*buffer++ = 0;  // terminating 0
	// do not include term 0 in sz count;
	break;
	}
    // escape the quoted part
    s = start + 1;
    *end = 0;  // mark end of "input" string, replacing escPunc. input string is temporary anyway.
    int inputSize = end - s;
    int worstCase = inputSize*2 + 1;
    if (worstCase > remainder)
	errAbort("Buffer too small for escaping in sqlEscapeAllStrings. s=[%s] bufSize = %d", sOrig, bufSize);
    int escSize = mysql_escape_string(buffer, s, inputSize);
    buffer += escSize;
    sz += escSize;
    remainder -= escSize;
    s = end + 1;	
    }
return sz;
}

struct restoreSafeStr
    {
    struct restoreSafeStr *next;
    char *s;
    int strLen;
    };

int vaSqlSafefNoAbort(char* buffer, int bufSize, boolean newString, char *format, va_list args)
/* VarArgs Format string to buffer, vsprintf style, only with buffer overflow
 * checking.  The resulting string is always terminated with zero byte.
 * Scans string parameters for illegal sql chars. 
 * Automatically escapes quoted string values.
 * This function should be efficient on statements with many strings to be escaped. */
{
va_list orig_args;
va_copy(orig_args, args);
int formatLen = strlen(format);

char escPunc = 0x01;  // using char 1 as special char to denote strings needing escaping
char *newFormat = NULL;
int newFormatSize = 2*formatLen + 1;
if (newString)
    newFormatSize += NOSQLINJ_SIZE;
newFormat = needMem(newFormatSize);
char *nf = newFormat;
if (newString)
    nf += safef(newFormat, newFormatSize, "%s", NOSQLINJ "");
char *lastPct = NULL;
int escStringsCount = 0;
int escStringsSize = 0;

struct restoreSafeStr *restoreSafeStrList=NULL, *restoreSafeStr=NULL;

char c = 0;
int i = 0;
char quote = 0;
boolean inPct = FALSE;
boolean isLong = FALSE;
boolean isLongLong = FALSE;
boolean isNegated = FALSE;
while (i < formatLen)
    {
    c = format[i];
    *nf++ = c;
    // start quote
    if (quote==0 && (c == '\'' || c == '"' || c == '`'))
	quote = c;
    // end quote
    else if (c == quote)
	quote = 0;
    else if (c == '%' && !inPct)
	{
	inPct = TRUE;
	lastPct = nf - 1;  // remember where the start was.
	}
    else if (c == '%' && inPct)
	inPct = FALSE;
    else if (inPct) 
        {
	if (c == 'l')
	    {
	    if (isLong)
		isLongLong = TRUE;
	    else
		isLong = TRUE;
	    }
	else if (strchr("diuoxXeEfFgGpcs",c))
	    {
	    inPct = FALSE;
	    // convert to equivalent types
	    if (c == 'i') c = 'd';  
	    if (c == 'E') c = 'e';  
	    if (c == 'F') c = 'f';  
	    if (c == 'G') c = 'g';  
	    if (c == 'o') c = 'u';  
	    if (c == 'x') c = 'u';  
	    if (c == 'X') c = 'u';  
	    // we finally have the expected format
	    // for all except s, we just want to skip it, but va_arg is the only way to do it!
	    // signed integers
	    if      (c == 'd' && !isLong)               { va_arg(args,                    int); }
	    else if (c == 'd' && isLong && !isLongLong) { va_arg(args,               long int); }
	    else if (c == 'd' && isLong && isLongLong)  { va_arg(args,          long long int); }
	    // unsigned integers
	    else if (c == 'u' && !isLong)               { va_arg(args, unsigned           int); }
	    else if (c == 'u' && isLong && !isLongLong) { va_arg(args, unsigned      long int); }
	    else if (c == 'u' && isLong && isLongLong)  { va_arg(args, unsigned long long int); }
	    else if (c == 'e')                          { va_arg(args,                 double); }
	    else if (c == 'f')                          { va_arg(args,                 double); }
	    else if (c == 'g')                          { va_arg(args,                 double); }
	    // pointer is void *
	    else if (c == 'p')                          { va_arg(args,                 void *); }
	    // char get promoted to int by varargs process
	    else if (c == 'c')                          { va_arg(args,                    int); }
	    // finally, the string we care about!
	    else if (c == 's')
		{
		char *s = va_arg(args, char *);
		if (s == NULL)
    		    sqlCheckError("%%s value is NULL which is incorrect.");
		if (quote == 0)
		    { // check identifier
		    if (!isNegated) // Not a Pre-escaped String
			sqlCheckIdentifier(s);
		    else  
			{
			if (startsWith(NOSQLINJ, s))
			    {
			    // wipe out the prefix by removing from the input string s
			    int strLen = strlen(s);
			    memmove(s, s+NOSQLINJ_SIZE, strLen - NOSQLINJ_SIZE + 1);
			    AllocVar(restoreSafeStr);
			    restoreSafeStr->s = s;
			    restoreSafeStr->strLen = strLen;
			    slAddHead(&restoreSafeStrList, restoreSafeStr);
			    }
			else
			    {
			    sqlCheckError("Internal Error: Input to %%-s should be created with safe functions.");
			    // will continue here if non-abort level chosen.
			    }
			}
		    }
		else
		    { // check quoted literal
		    if (!isNegated) // Not a Pre-escaped String
			{
			// go back and insert escPunc before the leading % char saved in lastPct
			// move the accumulated %s descriptor
			memmove(lastPct+1, lastPct, nf - lastPct); // this is typically very small, src and dest overlap.
			++nf;
			*lastPct  = escPunc;
			*nf++ = escPunc;
			++escStringsCount;
			if (s == NULL)
			    {
			    escStringsSize += strlen("(null)");
			    }
			else
			    {
			    escStringsSize += strlen(s);
			    }
			}
		    else  // quoted -s has no meaning or use, so not allow.
			{
			sqlCheckError("quoted -s in format string is not allowed.");
			}
		    }
		}
	    else
		{
		errAbort("unexpected error processing vaSqlSafef, format: %s", format);
		}		

	    isLong = FALSE;
	    isLongLong = FALSE;
	    isNegated = FALSE;
	    }
	else if (strchr("+-.1234567890",c))
	    {
	    if (c == '-')
		isNegated = TRUE;
	    }
	else
	    errAbort("string format not understood in vaSqlSafef: %s", format);
	}
    ++i;	    
    }

int sz = 0; 
if (escStringsCount > 0)
    {
    int tempSize = bufSize + 2*escStringsCount;  // if it won't fit in this it will never fit.
    char *tempBuf = needMem(tempSize);
    sz = vsnprintf(tempBuf, tempSize, newFormat, orig_args);
    /* note that some versions return -1 if too small */
    if (sz != -1 && sz + 1 <= tempSize)
	{
	// unfortunately we have to copy the string 1 more time unless we want
	// to force the user to allocate extra "safety space" for mysql_escape.
	int tempSize2 = sz + 1 + escStringsSize;  // handle worst-case
	char *tempBuf2 = needMem(tempSize2);
	sz = sqlEscapeAllStrings(tempBuf2, tempBuf, tempSize2, escPunc);
	if (sz + 1 > tempSize2)
	    errAbort("unexpected error in vaSqlSafefNoAbort: tempBuf2 overflowed. tempSize2=%d sz=%d", tempSize, sz); 
	if (sz + 1 <= bufSize) // NO buffer overflow
	    {
	    // copy string to its final destination.
	    memmove(buffer, tempBuf2, sz+1); // +1 for terminating 0;
	    }
	freeMem(tempBuf2);
	}
    freeMem(tempBuf);
    }
else
    {
    sz = vsnprintf(buffer, bufSize, newFormat, orig_args);
    /* note that some version return -1 if too small */
    }

freeMem(newFormat);
va_end(orig_args);

// Restore prefixes which were removed from string pointer args with %-s
for (restoreSafeStr = restoreSafeStrList; restoreSafeStr; restoreSafeStr=restoreSafeStr->next)
    {
    memmove(restoreSafeStr->s+NOSQLINJ_SIZE, restoreSafeStr->s, restoreSafeStr->strLen - NOSQLINJ_SIZE + 1);
    memmove(restoreSafeStr->s, NOSQLINJ, NOSQLINJ_SIZE);
    }
slFreeList(&restoreSafeStrList);

return sz;

}





int vaSqlSafef(char* buffer, int bufSize, char *format, va_list args)
/* VarArgs Format string to buffer, vsprintf style, only with buffer overflow
 * checking.  The resulting string is always terminated with zero byte. */
{
int sz = vaSqlSafefNoAbort(buffer, bufSize, TRUE, format, args);
if ((sz < 0) || (sz >= bufSize))
    {
    buffer[bufSize-1] = (char) 0;
    errAbort("buffer overflow, size %d, format: %s, buffer: '%s'", bufSize, format, buffer);
    }
return sz;
}

int sqlSafef(char* buffer, int bufSize, char *format, ...)
/* Format string to buffer, vsprintf style, only with buffer overflow
 * checking.  The resulting string is always terminated with zero byte. 
 * Scans unquoted string parameters for illegal literal sql chars.
 * Escapes quoted string parameters. 
 * NOSLQINJ tag is added to beginning. */
{
int sz;
va_list args;
va_start(args, format);
sz = vaSqlSafef(buffer, bufSize, format, args);
va_end(args);
return sz;
}


/* --------------------------- */


void vaSqlDyStringPrintf(struct dyString *ds, char *format, va_list args)
/* VarArgs Printf to end of dyString after scanning string parameters for illegal sql chars.
 * Strings inside quotes are automatically escaped.  
 * NOSLQINJ tag is added to beginning if it is a new empty string. */
{
/* attempt to format the string in the current space.  If there
 * is not enough room, increase the buffer size and try again */
int avail, sz;
while (TRUE)
    {
    va_list argscp;
    va_copy(argscp, args);
    avail = ds->bufSize - ds->stringSize;
    if (avail <= 0)
        {
        /* Don't pass zero sized buffers to vsnprintf, because who knows
         * if the library function will handle it. */
        dyStringBumpBufSize(ds, ds->bufSize+ds->bufSize);
        avail = ds->bufSize - ds->stringSize;
        }
    if (ds->stringSize > 0 && !startsWith(NOSQLINJ, ds->string))
	{
	sqlCheckError("sqlDyPrintf called on non-empty non-safe string.");
	}
	
    sz = vaSqlSafefNoAbort(ds->string + ds->stringSize, avail, ds->stringSize==0, format, argscp);
    va_end(argscp);

    /* note that some version return -1 if too small */
    if ((sz < 0) || (sz >= avail))
	{
        dyStringBumpBufSize(ds, ds->bufSize+ds->bufSize);
	}
    else
        {
        ds->stringSize += sz;
        break;
        }
    }
}

void sqlDyStringPrintf(struct dyString *ds, char *format, ...)
/* Printf to end of dyString after scanning string parameters for illegal sql chars.
 * Strings inside quotes are automatically escaped.  
 * NOSLQINJ tag is added to beginning if it is a new empty string. 
 * Appends to existing string. */
{
va_list args;
va_start(args, format);
vaSqlDyStringPrintf(ds, format, args);
va_end(args);
}

struct dyString *sqlDyStringCreate(char *format, ...)
/* Create a dyString with a printf style initial content 
 * Adds the NOSQLINJ prefix. */
{
int len = strlen(format) * 3;
struct dyString *ds = dyStringNew(len);
va_list args;
va_start(args, format);
vaSqlDyStringPrintf(ds, format, args);
va_end(args);
return ds;
}

void sqlDyStringPrintIdList(struct dyString *ds, char *fields)
/* Append a comma-separated list of field identifiers. Aborts if invalid characters in list. */
{
sqlCkIl(fieldsSafe, fields)
sqlDyStringPrintf(ds, "%-s", fieldsSafe);
}


void sqlDyStringPrintValuesList(struct dyString *ds, struct slName *list)
/* Append a comma-separated, quoted and escaped list of values. */
{
struct slName *el;
for (el = list; el != NULL; el = el->next)
    {
    if (el != list)
	sqlDyStringPrintf(ds, ",");
    sqlDyStringPrintf(ds, "'%s'", el->name);
    }
}

void sqlCheckError(char *format, ...)
/* A sql injection error has occurred. Check for settings and respond
 * as appropriate with error, warning, logOnly, ignore, dumpstack.
 * Then abort if needed. NOTE: unless it aborts, this function will return! */
{
va_list args;
va_start(args, format);

char *noSqlInjLevel = cfgOptionDefault("noSqlInj.level", "abort");
char *noSqlInjDumpStack = cfgOption("noSqlInj.dumpStack");

if (sameOk(noSqlInjDumpStack, "on"))
    {
    va_list dump_args;
    va_copy(dump_args, args);
    vaDumpStack(format, dump_args);
    va_end(dump_args);
    }

if (sameString(noSqlInjLevel, "logOnly"))
    {
    vfprintf(stderr, format, args);
    fprintf(stderr, "\n");
    fflush(stderr);
    }

if (sameString(noSqlInjLevel, "warn"))
    {
    vaWarn(format, args);
    }

if (sameString(noSqlInjLevel, "abort"))
    {
    vaErrAbort(format, args);
    }

va_end(args);

}

/* functions moved here from hgTables.c 2019-04-04 - Hiram */
struct sqlFieldType *sqlFieldTypeNew(char *name, char *type)
/* Create a new sqlFieldType */
{
struct sqlFieldType *ft;
AllocVar(ft);
ft->name = cloneString(name);
ft->type = cloneString(type);
return ft;
}

void sqlFieldTypeFree(struct sqlFieldType **pFt)
/* Free resources used by sqlFieldType */
{
struct sqlFieldType *ft = *pFt;
if (ft != NULL)
    {
    freeMem(ft->name);
    freeMem(ft->type);
    freez(pFt);
    }
}

void sqlFieldTypeFreeList(struct sqlFieldType **pList)
/* Free a list of dynamically allocated sqlFieldType's */
{
struct sqlFieldType *el, *next;

for (el = *pList; el != NULL; el = next)
    {
    next = el->next;
    sqlFieldTypeFree(&el);
    }
*pList = NULL;
}

struct sqlFieldType *sqlFieldTypesFromAs(struct asObject *as)
/* Convert asObject to list of sqlFieldTypes */
{
struct sqlFieldType *ft, *list = NULL;
struct asColumn *col;
for (col = as->columnList; col != NULL; col = col->next)
    {
    struct dyString *type = asColumnToSqlType(col);
    ft = sqlFieldTypeNew(col->name, type->string);
    slAddHead(&list, ft);
    dyStringFree(&type);
    }
slReverse(&list);
return list;
}

struct sqlFieldType *sqlListFieldsAndTypes(struct sqlConnection *conn, char *table)
/* Get list of fields including their names and types.  The type currently is
 * just a MySQL type string. */
{
struct sqlFieldType *ft, *list = NULL;
char query[512];
struct sqlResult *sr;
char **row;
sqlSafef(query, sizeof(query), "describe %s", table);
sr = sqlGetResult(conn, query);
while ((row = sqlNextRow(sr)) != NULL)
    {
    ft = sqlFieldTypeNew(row[0], row[1]);
    slAddHead(&list, ft);
    }
sqlFreeResult(&sr);
slReverse(&list);
return list;
}

const char *sqlLastError(struct sqlConnection *sc)
/* Return the last error from a sql connection. */
{
return mysql_error(sc->conn);
}
