mirror of
https://github.com/jorisvink/kore
synced 2025-03-09 12:39:01 -04:00
Introduce synchronous pgsql queries.
Semantics for using pgsql API have changed quite heavily with this commit. See the examples for more information. Based on Github issue #95 by PauloMelo (paulo.melo@vintageform.pt) with several modifications by me.
This commit is contained in:
parent
8368f6d471
commit
a281fd5713
5
examples/pgsql-sync/.gitignore
vendored
Normal file
5
examples/pgsql-sync/.gitignore
vendored
Normal file
@ -0,0 +1,5 @@
|
||||
*.o
|
||||
.objs
|
||||
pgsql-sync.so
|
||||
assets.h
|
||||
cert
|
11
examples/pgsql-sync/conf/pgsql-sync.conf
Normal file
11
examples/pgsql-sync/conf/pgsql-sync.conf
Normal file
@ -0,0 +1,11 @@
|
||||
# Placeholder configuration
|
||||
|
||||
bind 127.0.0.1 8888
|
||||
load ./pgsql-sync.so init
|
||||
tls_dhparam dh2048.pem
|
||||
|
||||
domain 127.0.0.1 {
|
||||
certfile cert/server.crt
|
||||
certkey cert/server.key
|
||||
static / page
|
||||
}
|
92
examples/pgsql-sync/src/pgsql-sync.c
Normal file
92
examples/pgsql-sync/src/pgsql-sync.c
Normal file
@ -0,0 +1,92 @@
|
||||
/*
|
||||
* Copyright (c) 2015 Joris Vink <joris@coders.se>
|
||||
*
|
||||
* Permission to use, copy, modify, and distribute this software for any
|
||||
* purpose with or without fee is hereby granted, provided that the above
|
||||
* copyright notice and this permission notice appear in all copies.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
|
||||
/*
|
||||
* This example demonstrates how to use synchronous PGSQL queries
|
||||
* with Kore. For an asynchronous example see pgsql/ under examples/.
|
||||
*
|
||||
* This example does the same as the asynchronous one, select all entries
|
||||
* from a table called "coders".
|
||||
*/
|
||||
|
||||
#include <kore/kore.h>
|
||||
#include <kore/http.h>
|
||||
#include <kore/pgsql.h>
|
||||
|
||||
int init(int);
|
||||
int page(struct http_request *);
|
||||
|
||||
/* Called when our module is loaded (see config) */
|
||||
int
|
||||
init(int state)
|
||||
{
|
||||
/* Register our database. */
|
||||
kore_pgsql_register("db", "host=/tmp dbname=test");
|
||||
|
||||
return (KORE_RESULT_OK);
|
||||
}
|
||||
|
||||
/* Page handler entry point (see config) */
|
||||
int
|
||||
page(struct http_request *req)
|
||||
{
|
||||
struct kore_pgsql sql;
|
||||
char *name;
|
||||
int rows, i;
|
||||
|
||||
req->status = HTTP_STATUS_INTERNAL_ERROR;
|
||||
|
||||
/*
|
||||
* Initialise our kore_pgsql data structure with the database name
|
||||
* we want to connect to (note that we registered this earlier with
|
||||
* kore_pgsql_register()). We also say we will perform a synchronous
|
||||
* query (KORE_PGSQL_SYNC) and we do not need to pass our http_request
|
||||
* so we pass NULL instead.
|
||||
*/
|
||||
if (!kore_pgsql_query_init(&sql, NULL, "db", KORE_PGSQL_SYNC)) {
|
||||
kore_pgsql_logerror(&sql);
|
||||
goto out;
|
||||
}
|
||||
|
||||
/*
|
||||
* Now we can fire off the query, once it returns we either have
|
||||
* a result on which we can operate or an error occured.
|
||||
*/
|
||||
if (!kore_pgsql_query(&sql, "SELECT * FROM coders")) {
|
||||
kore_pgsql_logerror(&sql);
|
||||
goto out;
|
||||
}
|
||||
|
||||
/*
|
||||
* Iterate over the result and dump it to somewhere.
|
||||
*/
|
||||
rows = kore_pgsql_ntuples(&sql);
|
||||
for (i = 0; i < rows; i++) {
|
||||
name = kore_pgsql_getvalue(&sql, i, 0);
|
||||
kore_log(LOG_NOTICE, "name: '%s'", name);
|
||||
}
|
||||
|
||||
/* All good. */
|
||||
req->status = HTTP_STATUS_OK;
|
||||
|
||||
out:
|
||||
http_response(req, req->status, NULL, 0);
|
||||
|
||||
/* Don't forget to cleanup the kore_pgsql data structure. */
|
||||
kore_pgsql_cleanup(&sql);
|
||||
|
||||
return (KORE_RESULT_OK);
|
||||
}
|
@ -16,7 +16,8 @@
|
||||
|
||||
/*
|
||||
* This example demonstrates on how to use state machines and
|
||||
* asynchronous pgsql queries.
|
||||
* asynchronous pgsql queries. For a synchronous query example
|
||||
* see the pgsql-sync/ example under the examples/ directory.
|
||||
*
|
||||
* While this example might seem overly complex for a simple pgsql
|
||||
* query, there is a reason behind its complexity:
|
||||
@ -36,15 +37,17 @@
|
||||
#include <kore/http.h>
|
||||
#include <kore/pgsql.h>
|
||||
|
||||
#define REQ_STATE_QUERY 0
|
||||
#define REQ_STATE_DB_WAIT 1
|
||||
#define REQ_STATE_DB_READ 2
|
||||
#define REQ_STATE_ERROR 3
|
||||
#define REQ_STATE_DONE 4
|
||||
#define REQ_STATE_INIT 0
|
||||
#define REQ_STATE_QUERY 1
|
||||
#define REQ_STATE_DB_WAIT 2
|
||||
#define REQ_STATE_DB_READ 3
|
||||
#define REQ_STATE_ERROR 4
|
||||
#define REQ_STATE_DONE 5
|
||||
|
||||
int init(int);
|
||||
int page(struct http_request *);
|
||||
|
||||
static int request_perform_init(struct http_request *);
|
||||
static int request_perform_query(struct http_request *);
|
||||
static int request_db_wait(struct http_request *);
|
||||
static int request_db_read(struct http_request *);
|
||||
@ -52,6 +55,7 @@ static int request_error(struct http_request *);
|
||||
static int request_done(struct http_request *);
|
||||
|
||||
struct http_state mystates[] = {
|
||||
{ "REQ_STATE_INIT", request_perform_init },
|
||||
{ "REQ_STATE_QUERY", request_perform_query },
|
||||
{ "REQ_STATE_DB_WAIT", request_db_wait },
|
||||
{ "REQ_STATE_DB_READ", request_db_read },
|
||||
@ -62,6 +66,7 @@ struct http_state mystates[] = {
|
||||
#define mystates_size (sizeof(mystates) / sizeof(mystates[0]))
|
||||
|
||||
struct rstate {
|
||||
int cnt;
|
||||
struct kore_pgsql sql;
|
||||
};
|
||||
|
||||
@ -69,8 +74,8 @@ struct rstate {
|
||||
int
|
||||
init(int state)
|
||||
{
|
||||
/* Set our connection string. */
|
||||
pgsql_conn_string = "host=/var/run/postgresql/ dbname=test";
|
||||
/* Register our database. */
|
||||
kore_pgsql_register("db", "host=/tmp dbname=test");
|
||||
|
||||
return (KORE_RESULT_OK);
|
||||
}
|
||||
@ -84,29 +89,48 @@ page(struct http_request *req)
|
||||
return (http_state_run(mystates, mystates_size, req));
|
||||
}
|
||||
|
||||
/* The initial state, we setup our context and fire off the pgsql query. */
|
||||
/* Initialize our PGSQL data structure and prepare for an async query. */
|
||||
int
|
||||
request_perform_query(struct http_request *req)
|
||||
request_perform_init(struct http_request *req)
|
||||
{
|
||||
struct rstate *state;
|
||||
|
||||
/* Setup our state context. */
|
||||
state = kore_malloc(sizeof(*state));
|
||||
/* Setup our state context (if not yet set). */
|
||||
if (req->hdlr_extra == NULL) {
|
||||
state = kore_malloc(sizeof(*state));
|
||||
req->hdlr_extra = state;
|
||||
} else {
|
||||
state = req->hdlr_extra;
|
||||
}
|
||||
|
||||
/* Attach the state to our request. */
|
||||
req->hdlr_extra = state;
|
||||
/* Initialize our kore_pgsql data structure. */
|
||||
if (!kore_pgsql_query_init(&state->sql, req, "db", KORE_PGSQL_ASYNC)) {
|
||||
/* If the state was still INIT, we'll try again later. */
|
||||
if (state->sql.state == KORE_PGSQL_STATE_INIT) {
|
||||
req->fsm_state = REQ_STATE_INIT;
|
||||
return (HTTP_STATE_RETRY);
|
||||
}
|
||||
|
||||
kore_pgsql_logerror(&state->sql);
|
||||
req->fsm_state = REQ_STATE_ERROR;
|
||||
} else {
|
||||
req->fsm_state = REQ_STATE_QUERY;
|
||||
}
|
||||
|
||||
return (HTTP_STATE_CONTINUE);
|
||||
}
|
||||
|
||||
/* After setting everything up we will execute our async query. */
|
||||
int
|
||||
request_perform_query(struct http_request *req)
|
||||
{
|
||||
struct rstate *state = req->hdlr_extra;
|
||||
|
||||
/* We want to move to read result after this. */
|
||||
req->fsm_state = REQ_STATE_DB_WAIT;
|
||||
|
||||
/* Fire off the query. */
|
||||
if (!kore_pgsql_query(&state->sql, req, "SELECT * FROM coders")) {
|
||||
/* If the state was still INIT, we'll try again later. */
|
||||
if (state->sql.state == KORE_PGSQL_STATE_INIT) {
|
||||
req->fsm_state = REQ_STATE_QUERY;
|
||||
return (HTTP_STATE_RETRY);
|
||||
}
|
||||
|
||||
if (!kore_pgsql_query(&state->sql, "SELECT * FROM coders")) {
|
||||
/*
|
||||
* Let the state machine continue immediately since we
|
||||
* have an error anyway.
|
||||
|
@ -22,6 +22,9 @@
|
||||
#define KORE_PGSQL_FORMAT_TEXT 0
|
||||
#define KORE_PGSQL_FORMAT_BINARY 1
|
||||
|
||||
#define KORE_PGSQL_SYNC 0x0001
|
||||
#define KORE_PGSQL_ASYNC 0x0002
|
||||
|
||||
#if defined(__cplusplus)
|
||||
extern "C" {
|
||||
#endif
|
||||
@ -29,14 +32,23 @@ extern "C" {
|
||||
struct pgsql_conn {
|
||||
u_int8_t type;
|
||||
u_int8_t flags;
|
||||
char *name;
|
||||
|
||||
PGconn *db;
|
||||
struct pgsql_job *job;
|
||||
TAILQ_ENTRY(pgsql_conn) list;
|
||||
};
|
||||
|
||||
struct pgsql_db {
|
||||
char *name;
|
||||
char *conn_string;
|
||||
|
||||
LIST_ENTRY(pgsql_db) rlist;
|
||||
};
|
||||
|
||||
struct kore_pgsql {
|
||||
u_int8_t state;
|
||||
int flags;
|
||||
char *error;
|
||||
PGresult *result;
|
||||
struct pgsql_conn *conn;
|
||||
@ -45,17 +57,17 @@ struct kore_pgsql {
|
||||
};
|
||||
|
||||
extern u_int16_t pgsql_conn_max;
|
||||
extern char *pgsql_conn_string;
|
||||
|
||||
void kore_pgsql_init(void);
|
||||
int kore_pgsql_query_init(struct kore_pgsql *, struct http_request *,
|
||||
const char *, int);
|
||||
void kore_pgsql_handle(void *, int);
|
||||
void kore_pgsql_cleanup(struct kore_pgsql *);
|
||||
void kore_pgsql_continue(struct http_request *, struct kore_pgsql *);
|
||||
int kore_pgsql_query(struct kore_pgsql *, struct http_request *,
|
||||
const char *);
|
||||
int kore_pgsql_query_params(struct kore_pgsql *, struct http_request *,
|
||||
int kore_pgsql_query(struct kore_pgsql *, const char *);
|
||||
int kore_pgsql_query_params(struct kore_pgsql *,
|
||||
const char *, int, u_int8_t, ...);
|
||||
|
||||
int kore_pgsql_register(const char *, const char *);
|
||||
int kore_pgsql_ntuples(struct kore_pgsql *);
|
||||
void kore_pgsql_logerror(struct kore_pgsql *);
|
||||
void kore_pgsql_queue_remove(struct http_request *);
|
||||
|
273
src/pgsql.c
273
src/pgsql.c
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2014 Joris Vink <joris@coders.se>
|
||||
* Copyright (c) 2014-2015 Joris Vink <joris@coders.se>
|
||||
*
|
||||
* Permission to use, copy, modify, and distribute this software for any
|
||||
* purpose with or without fee is hereby granted, provided that the above
|
||||
@ -25,7 +25,6 @@
|
||||
#include "pgsql.h"
|
||||
|
||||
struct pgsql_job {
|
||||
char *query;
|
||||
struct http_request *req;
|
||||
struct kore_pgsql *pgsql;
|
||||
|
||||
@ -37,28 +36,30 @@ struct pgsql_wait {
|
||||
TAILQ_ENTRY(pgsql_wait) list;
|
||||
};
|
||||
|
||||
#define PGSQL_IS_BLOCKING 0
|
||||
#define PGSQL_IS_ASYNC 1
|
||||
|
||||
#define PGSQL_CONN_MAX 2
|
||||
#define PGSQL_CONN_FREE 0x01
|
||||
#define PGSQL_LIST_INSERTED 0x0100
|
||||
|
||||
static void pgsql_queue_wakeup(void);
|
||||
static int pgsql_conn_create(struct kore_pgsql *);
|
||||
static void pgsql_set_error(struct kore_pgsql *, const char *);
|
||||
static void pgsql_queue_add(struct http_request *);
|
||||
static void pgsql_conn_release(struct kore_pgsql *);
|
||||
static void pgsql_conn_cleanup(struct pgsql_conn *);
|
||||
static void pgsql_read_result(struct kore_pgsql *, int);
|
||||
static void pgsql_schedule(struct kore_pgsql *, struct http_request *);
|
||||
static int pgsql_prepare(struct kore_pgsql *, struct http_request *,
|
||||
const char *);
|
||||
static void pgsql_read_result(struct kore_pgsql *);
|
||||
static void pgsql_schedule(struct kore_pgsql *);
|
||||
|
||||
static struct pgsql_conn *pgsql_conn_create(struct kore_pgsql *,
|
||||
struct pgsql_db *);
|
||||
static struct pgsql_conn *pgsql_conn_next(struct kore_pgsql *,
|
||||
struct pgsql_db *,
|
||||
struct http_request *);
|
||||
|
||||
static struct kore_pool pgsql_job_pool;
|
||||
static struct kore_pool pgsql_wait_pool;
|
||||
static TAILQ_HEAD(, pgsql_conn) pgsql_conn_free;
|
||||
static TAILQ_HEAD(, pgsql_wait) pgsql_wait_queue;
|
||||
static LIST_HEAD(, pgsql_db) pgsql_db_conn_strings;
|
||||
static u_int16_t pgsql_conn_count;
|
||||
char *pgsql_conn_string = NULL;
|
||||
u_int16_t pgsql_conn_max = PGSQL_CONN_MAX;
|
||||
|
||||
void
|
||||
@ -67,6 +68,7 @@ kore_pgsql_init(void)
|
||||
pgsql_conn_count = 0;
|
||||
TAILQ_INIT(&pgsql_conn_free);
|
||||
TAILQ_INIT(&pgsql_wait_queue);
|
||||
LIST_INIT(&pgsql_db_conn_strings);
|
||||
|
||||
kore_pool_init(&pgsql_job_pool, "pgsql_job_pool",
|
||||
sizeof(struct pgsql_job), 100);
|
||||
@ -75,32 +77,91 @@ kore_pgsql_init(void)
|
||||
}
|
||||
|
||||
int
|
||||
kore_pgsql_query(struct kore_pgsql *pgsql, struct http_request *req,
|
||||
const char *query)
|
||||
kore_pgsql_query_init(struct kore_pgsql *pgsql, struct http_request *req,
|
||||
const char *dbname, int flags)
|
||||
{
|
||||
if (!pgsql_prepare(pgsql, req, query))
|
||||
return (KORE_RESULT_ERROR);
|
||||
struct pgsql_db *db;
|
||||
|
||||
if (!PQsendQuery(pgsql->conn->db, query)) {
|
||||
pgsql_conn_cleanup(pgsql->conn);
|
||||
memset(pgsql, 0, sizeof(*pgsql));
|
||||
pgsql->flags = flags;
|
||||
pgsql->state = KORE_PGSQL_STATE_INIT;
|
||||
|
||||
if ((req == NULL && (flags & KORE_PGSQL_ASYNC)) ||
|
||||
((flags & KORE_PGSQL_ASYNC) && (flags & KORE_PGSQL_SYNC))) {
|
||||
pgsql_set_error(pgsql, "invalid query init parameters");
|
||||
return (KORE_RESULT_ERROR);
|
||||
}
|
||||
|
||||
pgsql_schedule(pgsql, req);
|
||||
db = NULL;
|
||||
LIST_FOREACH(db, &pgsql_db_conn_strings, rlist) {
|
||||
if (!strcmp(db->name, dbname))
|
||||
break;
|
||||
}
|
||||
|
||||
if (db == NULL) {
|
||||
pgsql_set_error(pgsql, "no database found");
|
||||
return (KORE_RESULT_ERROR);
|
||||
}
|
||||
|
||||
if ((pgsql->conn = pgsql_conn_next(pgsql, db, req)) == NULL)
|
||||
return (KORE_RESULT_ERROR);
|
||||
|
||||
if (pgsql->flags & KORE_PGSQL_ASYNC) {
|
||||
pgsql->conn->job = kore_pool_get(&pgsql_job_pool);
|
||||
pgsql->conn->job->req = req;
|
||||
pgsql->conn->job->pgsql = pgsql;
|
||||
|
||||
http_request_sleep(req);
|
||||
pgsql->flags |= PGSQL_LIST_INSERTED;
|
||||
LIST_INSERT_HEAD(&(req->pgsqls), pgsql, rlist);
|
||||
}
|
||||
|
||||
return (KORE_RESULT_OK);
|
||||
}
|
||||
|
||||
int
|
||||
kore_pgsql_query_params(struct kore_pgsql *pgsql, struct http_request *req,
|
||||
kore_pgsql_query(struct kore_pgsql *pgsql, const char *query)
|
||||
{
|
||||
if (pgsql->conn == NULL) {
|
||||
pgsql_set_error(pgsql, "no connection was set before query");
|
||||
return (KORE_RESULT_ERROR);
|
||||
}
|
||||
|
||||
if (pgsql->flags & KORE_PGSQL_SYNC) {
|
||||
pgsql->result = PQexec(pgsql->conn->db, query);
|
||||
|
||||
if ((PQresultStatus(pgsql->result) != PGRES_TUPLES_OK) &&
|
||||
(PQresultStatus(pgsql->result) != PGRES_COMMAND_OK)) {
|
||||
pgsql_set_error(pgsql, PQerrorMessage(pgsql->conn->db));
|
||||
return (KORE_RESULT_ERROR);
|
||||
}
|
||||
|
||||
pgsql->state = KORE_PGSQL_STATE_DONE;
|
||||
} else {
|
||||
if (!PQsendQuery(pgsql->conn->db, query)) {
|
||||
pgsql_set_error(pgsql, PQerrorMessage(pgsql->conn->db));
|
||||
return (KORE_RESULT_ERROR);
|
||||
}
|
||||
|
||||
pgsql_schedule(pgsql);
|
||||
}
|
||||
|
||||
return (KORE_RESULT_OK);
|
||||
}
|
||||
|
||||
int
|
||||
kore_pgsql_query_params(struct kore_pgsql *pgsql,
|
||||
const char *query, int result, u_int8_t count, ...)
|
||||
{
|
||||
u_int8_t i;
|
||||
va_list args;
|
||||
char **values;
|
||||
int *lengths, *formats;
|
||||
int *lengths, *formats, ret;
|
||||
|
||||
if (!pgsql_prepare(pgsql, req, query))
|
||||
if (pgsql->conn == NULL) {
|
||||
pgsql_set_error(pgsql, "no connection was set before query");
|
||||
return (KORE_RESULT_ERROR);
|
||||
}
|
||||
|
||||
if (count > 0) {
|
||||
va_start(args, count);
|
||||
@ -120,20 +181,55 @@ kore_pgsql_query_params(struct kore_pgsql *pgsql, struct http_request *req,
|
||||
values = NULL;
|
||||
}
|
||||
|
||||
if (!PQsendQueryParams(pgsql->conn->db, query, count, NULL,
|
||||
(const char * const *)values, lengths, formats, result)) {
|
||||
kore_mem_free(values);
|
||||
kore_mem_free(lengths);
|
||||
kore_mem_free(formats);
|
||||
pgsql_conn_cleanup(pgsql->conn);
|
||||
return (KORE_RESULT_ERROR);
|
||||
ret = KORE_RESULT_ERROR;
|
||||
|
||||
if (pgsql->flags & KORE_PGSQL_SYNC) {
|
||||
pgsql->result = PQexecParams(pgsql->conn->db, query, count,
|
||||
NULL, (const char * const *)values, lengths, formats,
|
||||
result);
|
||||
|
||||
if ((PQresultStatus(pgsql->result) != PGRES_TUPLES_OK) &&
|
||||
(PQresultStatus(pgsql->result) != PGRES_COMMAND_OK)) {
|
||||
pgsql_set_error(pgsql, PQerrorMessage(pgsql->conn->db));
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
pgsql->state = KORE_PGSQL_STATE_DONE;
|
||||
} else {
|
||||
if (!PQsendQueryParams(pgsql->conn->db, query, count, NULL,
|
||||
(const char * const *)values, lengths, formats, result)) {
|
||||
pgsql_set_error(pgsql, PQerrorMessage(pgsql->conn->db));
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
pgsql_schedule(pgsql);
|
||||
}
|
||||
|
||||
ret = KORE_RESULT_OK;
|
||||
|
||||
cleanup:
|
||||
kore_mem_free(values);
|
||||
kore_mem_free(lengths);
|
||||
kore_mem_free(formats);
|
||||
|
||||
pgsql_schedule(pgsql, req);
|
||||
return (ret);
|
||||
}
|
||||
|
||||
int
|
||||
kore_pgsql_register(const char *dbname, const char *connstring)
|
||||
{
|
||||
struct pgsql_db *pgsqldb;
|
||||
|
||||
LIST_FOREACH(pgsqldb, &pgsql_db_conn_strings, rlist) {
|
||||
if (!strcmp(pgsqldb->name, dbname))
|
||||
return (KORE_RESULT_ERROR);
|
||||
}
|
||||
|
||||
pgsqldb = kore_malloc(sizeof(*pgsqldb));
|
||||
pgsqldb->name = kore_strdup(dbname);
|
||||
pgsqldb->conn_string = kore_strdup(connstring);
|
||||
LIST_INSERT_HEAD(&pgsql_db_conn_strings, pgsqldb, rlist);
|
||||
|
||||
return (KORE_RESULT_OK);
|
||||
}
|
||||
|
||||
@ -157,7 +253,7 @@ kore_pgsql_handle(void *c, int err)
|
||||
pgsql->state = KORE_PGSQL_STATE_ERROR;
|
||||
pgsql->error = kore_strdup(PQerrorMessage(conn->db));
|
||||
} else {
|
||||
pgsql_read_result(pgsql, PGSQL_IS_ASYNC);
|
||||
pgsql_read_result(pgsql);
|
||||
}
|
||||
|
||||
if (pgsql->state == KORE_PGSQL_STATE_WAIT) {
|
||||
@ -218,7 +314,10 @@ kore_pgsql_cleanup(struct kore_pgsql *pgsql)
|
||||
pgsql->error = NULL;
|
||||
pgsql->conn = NULL;
|
||||
|
||||
LIST_REMOVE(pgsql, rlist);
|
||||
if (pgsql->flags & PGSQL_LIST_INSERTED) {
|
||||
LIST_REMOVE(pgsql, rlist);
|
||||
pgsql->flags &= ~PGSQL_LIST_INSERTED;
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
@ -262,47 +361,55 @@ kore_pgsql_queue_remove(struct http_request *req)
|
||||
}
|
||||
}
|
||||
|
||||
static int
|
||||
pgsql_prepare(struct kore_pgsql *pgsql, struct http_request *req,
|
||||
const char *query)
|
||||
static struct pgsql_conn *
|
||||
pgsql_conn_next(struct kore_pgsql *pgsql, struct pgsql_db *db,
|
||||
struct http_request *req)
|
||||
{
|
||||
struct pgsql_conn *conn;
|
||||
|
||||
pgsql->state = KORE_PGSQL_STATE_INIT;
|
||||
pgsql->result = NULL;
|
||||
pgsql->error = NULL;
|
||||
pgsql->conn = NULL;
|
||||
conn = NULL;
|
||||
|
||||
if (TAILQ_EMPTY(&pgsql_conn_free)) {
|
||||
if (pgsql_conn_count >= pgsql_conn_max) {
|
||||
pgsql_queue_add(req);
|
||||
return (KORE_RESULT_ERROR);
|
||||
}
|
||||
|
||||
if (!pgsql_conn_create(pgsql))
|
||||
return (KORE_RESULT_ERROR);
|
||||
TAILQ_FOREACH(conn, &pgsql_conn_free, list) {
|
||||
if (!(conn->flags & PGSQL_CONN_FREE))
|
||||
fatal("got a pgsql connection that was not free?");
|
||||
if (!strcmp(conn->name, db->name))
|
||||
break;
|
||||
}
|
||||
|
||||
http_request_sleep(req);
|
||||
conn = TAILQ_FIRST(&pgsql_conn_free);
|
||||
if (!(conn->flags & PGSQL_CONN_FREE))
|
||||
fatal("received a pgsql conn that was not free?");
|
||||
if (conn == NULL) {
|
||||
if (pgsql_conn_count >= pgsql_conn_max) {
|
||||
if (pgsql->flags & KORE_PGSQL_ASYNC) {
|
||||
pgsql_queue_add(req);
|
||||
} else {
|
||||
pgsql_set_error(pgsql,
|
||||
"no available connection");
|
||||
}
|
||||
|
||||
return (NULL);
|
||||
}
|
||||
|
||||
if ((conn = pgsql_conn_create(pgsql, db)) == NULL)
|
||||
return (NULL);
|
||||
}
|
||||
|
||||
conn->flags &= ~PGSQL_CONN_FREE;
|
||||
TAILQ_REMOVE(&pgsql_conn_free, conn, list);
|
||||
|
||||
pgsql->conn = conn;
|
||||
conn->job = kore_pool_get(&pgsql_job_pool);
|
||||
conn->job->query = kore_strdup(query);
|
||||
conn->job->pgsql = pgsql;
|
||||
conn->job->req = req;
|
||||
|
||||
LIST_INSERT_HEAD(&(req->pgsqls), pgsql, rlist);
|
||||
return (KORE_RESULT_OK);
|
||||
return (conn);
|
||||
}
|
||||
|
||||
static void
|
||||
pgsql_schedule(struct kore_pgsql *pgsql, struct http_request *req)
|
||||
pgsql_set_error(struct kore_pgsql *pgsql, const char *msg)
|
||||
{
|
||||
if (pgsql->error != NULL)
|
||||
kore_mem_free(pgsql->error);
|
||||
|
||||
pgsql->error = kore_strdup(msg);
|
||||
pgsql->state = KORE_PGSQL_STATE_ERROR;
|
||||
}
|
||||
|
||||
static void
|
||||
pgsql_schedule(struct kore_pgsql *pgsql)
|
||||
{
|
||||
int fd;
|
||||
|
||||
@ -347,33 +454,32 @@ pgsql_queue_wakeup(void)
|
||||
}
|
||||
}
|
||||
|
||||
static int
|
||||
pgsql_conn_create(struct kore_pgsql *pgsql)
|
||||
static struct pgsql_conn *
|
||||
pgsql_conn_create(struct kore_pgsql *pgsql, struct pgsql_db *db)
|
||||
{
|
||||
struct pgsql_conn *conn;
|
||||
|
||||
if (pgsql_conn_string == NULL)
|
||||
if (db == NULL || db->conn_string == NULL)
|
||||
fatal("pgsql_conn_create: no connection string");
|
||||
|
||||
pgsql_conn_count++;
|
||||
conn = kore_malloc(sizeof(*conn));
|
||||
kore_debug("pgsql_conn_create(): %p", conn);
|
||||
memset(conn, 0, sizeof(*conn));
|
||||
|
||||
conn->db = PQconnectdb(pgsql_conn_string);
|
||||
conn->db = PQconnectdb(db->conn_string);
|
||||
if (conn->db == NULL || (PQstatus(conn->db) != CONNECTION_OK)) {
|
||||
pgsql->state = KORE_PGSQL_STATE_ERROR;
|
||||
pgsql->error = kore_strdup(PQerrorMessage(conn->db));
|
||||
pgsql_set_error(pgsql, PQerrorMessage(conn->db));
|
||||
pgsql_conn_cleanup(conn);
|
||||
return (KORE_RESULT_ERROR);
|
||||
return (NULL);
|
||||
}
|
||||
|
||||
conn->job = NULL;
|
||||
conn->flags = PGSQL_CONN_FREE;
|
||||
conn->type = KORE_TYPE_PGSQL_CONN;
|
||||
conn->name = kore_strdup(db->name);
|
||||
TAILQ_INSERT_TAIL(&pgsql_conn_free, conn, list);
|
||||
|
||||
return (KORE_RESULT_OK);
|
||||
return (conn);
|
||||
}
|
||||
|
||||
static void
|
||||
@ -384,8 +490,14 @@ pgsql_conn_release(struct kore_pgsql *pgsql)
|
||||
if (pgsql->conn == NULL)
|
||||
return;
|
||||
|
||||
kore_mem_free(pgsql->conn->job->query);
|
||||
kore_pool_put(&pgsql_job_pool, pgsql->conn->job);
|
||||
/* Async query cleanup */
|
||||
if (pgsql->flags & KORE_PGSQL_ASYNC) {
|
||||
if (pgsql->conn != NULL) {
|
||||
fd = PQsocket(pgsql->conn->db);
|
||||
kore_platform_disable_read(fd);
|
||||
kore_pool_put(&pgsql_job_pool, pgsql->conn->job);
|
||||
}
|
||||
}
|
||||
|
||||
/* Drain just in case. */
|
||||
while (PQgetResult(pgsql->conn->db) != NULL)
|
||||
@ -395,9 +507,6 @@ pgsql_conn_release(struct kore_pgsql *pgsql)
|
||||
pgsql->conn->flags |= PGSQL_CONN_FREE;
|
||||
TAILQ_INSERT_TAIL(&pgsql_conn_free, pgsql->conn, list);
|
||||
|
||||
fd = PQsocket(pgsql->conn->db);
|
||||
kore_platform_disable_read(fd);
|
||||
|
||||
pgsql->conn = NULL;
|
||||
pgsql->state = KORE_PGSQL_STATE_COMPLETE;
|
||||
|
||||
@ -421,10 +530,8 @@ pgsql_conn_cleanup(struct pgsql_conn *conn)
|
||||
http_request_wakeup(req);
|
||||
|
||||
pgsql->conn = NULL;
|
||||
pgsql->state = KORE_PGSQL_STATE_ERROR;
|
||||
pgsql->error = kore_strdup(PQerrorMessage(conn->db));
|
||||
pgsql_set_error(pgsql, PQerrorMessage(conn->db));
|
||||
|
||||
kore_mem_free(conn->job->query);
|
||||
kore_pool_put(&pgsql_job_pool, conn->job);
|
||||
conn->job = NULL;
|
||||
}
|
||||
@ -433,17 +540,16 @@ pgsql_conn_cleanup(struct pgsql_conn *conn)
|
||||
PQfinish(conn->db);
|
||||
|
||||
pgsql_conn_count--;
|
||||
kore_mem_free(conn->name);
|
||||
kore_mem_free(conn);
|
||||
}
|
||||
|
||||
static void
|
||||
pgsql_read_result(struct kore_pgsql *pgsql, int async)
|
||||
pgsql_read_result(struct kore_pgsql *pgsql)
|
||||
{
|
||||
if (async) {
|
||||
if (PQisBusy(pgsql->conn->db)) {
|
||||
pgsql->state = KORE_PGSQL_STATE_WAIT;
|
||||
return;
|
||||
}
|
||||
if (PQisBusy(pgsql->conn->db)) {
|
||||
pgsql->state = KORE_PGSQL_STATE_WAIT;
|
||||
return;
|
||||
}
|
||||
|
||||
pgsql->result = PQgetResult(pgsql->conn->db);
|
||||
@ -470,8 +576,7 @@ pgsql_read_result(struct kore_pgsql *pgsql, int async)
|
||||
case PGRES_EMPTY_QUERY:
|
||||
case PGRES_BAD_RESPONSE:
|
||||
case PGRES_FATAL_ERROR:
|
||||
pgsql->state = KORE_PGSQL_STATE_ERROR;
|
||||
pgsql->error = kore_strdup(PQresultErrorMessage(pgsql->result));
|
||||
pgsql_set_error(pgsql, PQresultErrorMessage(pgsql->result));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user