pyqueue improvements.

- cleanup queue if it gets deallocated.
- make sure waitables on a queue get removed if their pyqueue_op dies.
This commit is contained in:
Joris Vink 2018-10-21 21:58:34 +02:00
parent 4ae3d23c7e
commit c8795b7d7f
2 changed files with 29 additions and 9 deletions

View File

@ -151,6 +151,7 @@ static PyTypeObject pysocket_op_type = {
struct pyqueue_waiting { struct pyqueue_waiting {
struct python_coro *coro; struct python_coro *coro;
struct pyqueue_op *op;
TAILQ_ENTRY(pyqueue_waiting) list; TAILQ_ENTRY(pyqueue_waiting) list;
}; };
@ -189,6 +190,7 @@ static PyTypeObject pyqueue_type = {
struct pyqueue_op { struct pyqueue_op {
PyObject_HEAD PyObject_HEAD
struct pyqueue *queue; struct pyqueue *queue;
struct pyqueue_waiting *waiting;
}; };
static void pyqueue_op_dealloc(struct pyqueue_op *); static void pyqueue_op_dealloc(struct pyqueue_op *);

View File

@ -1198,7 +1198,6 @@ pysocket_op_create(struct pysocket *sock, int type, const void *ptr, size_t len)
op->data.evt.handle = pysocket_evt_handle; op->data.evt.handle = pysocket_evt_handle;
Py_INCREF(op->data.socket); Py_INCREF(op->data.socket);
Py_INCREF(op->data.coro->obj);
switch (type) { switch (type) {
case PYSOCKET_TYPE_RECV: case PYSOCKET_TYPE_RECV:
@ -1408,6 +1407,22 @@ pysocket_evt_handle(void *arg, int error)
static void static void
pyqueue_dealloc(struct pyqueue *queue) pyqueue_dealloc(struct pyqueue *queue)
{ {
struct pyqueue_object *object;
struct pyqueue_waiting *waiting;
while ((object = TAILQ_FIRST(&queue->objects)) != NULL) {
TAILQ_REMOVE(&queue->objects, object, list);
Py_DECREF(object->obj);
kore_pool_put(&queue_object_pool, object);
}
while ((waiting = TAILQ_FIRST(&queue->waiting)) != NULL) {
TAILQ_REMOVE(&queue->waiting, waiting, list);
if (waiting->op != NULL)
waiting->op->waiting = NULL;
kore_pool_put(&queue_wait_pool, waiting);
}
PyObject_Del((PyObject *)queue); PyObject_Del((PyObject *)queue);
} }
@ -1415,20 +1430,18 @@ static PyObject *
pyqueue_pop(struct pyqueue *queue, PyObject *args) pyqueue_pop(struct pyqueue *queue, PyObject *args)
{ {
struct pyqueue_op *op; struct pyqueue_op *op;
struct pyqueue_waiting *waiting;
if ((op = PyObject_New(struct pyqueue_op, &pyqueue_op_type)) == NULL) if ((op = PyObject_New(struct pyqueue_op, &pyqueue_op_type)) == NULL)
return (NULL); return (NULL);
op->queue = queue; op->queue = queue;
op->waiting = kore_pool_get(&queue_wait_pool);
op->waiting->op = op;
waiting = kore_pool_get(&queue_wait_pool); op->waiting->coro = coro_running;
TAILQ_INSERT_TAIL(&queue->waiting, op->waiting, list);
waiting->coro = coro_running;
TAILQ_INSERT_TAIL(&queue->waiting, waiting, list);
Py_INCREF((PyObject *)queue); Py_INCREF((PyObject *)queue);
Py_INCREF(waiting->coro->obj);
return ((PyObject *)op); return ((PyObject *)op);
} }
@ -1460,7 +1473,7 @@ pyqueue_push(struct pyqueue *queue, PyObject *args)
else else
python_coro_wakeup(waiting->coro); python_coro_wakeup(waiting->coro);
Py_DECREF(waiting->coro->obj); waiting->op->waiting = NULL;
kore_pool_put(&queue_wait_pool, waiting); kore_pool_put(&queue_wait_pool, waiting);
} }
@ -1470,6 +1483,12 @@ pyqueue_push(struct pyqueue *queue, PyObject *args)
static void static void
pyqueue_op_dealloc(struct pyqueue_op *op) pyqueue_op_dealloc(struct pyqueue_op *op)
{ {
if (op->waiting != NULL) {
TAILQ_REMOVE(&op->queue->waiting, op->waiting, list);
kore_pool_put(&queue_wait_pool, op->waiting);
op->waiting = NULL;
}
Py_DECREF((PyObject *)op->queue); Py_DECREF((PyObject *)op->queue);
PyObject_Del((PyObject *)op); PyObject_Del((PyObject *)op);
} }
@ -1500,7 +1519,6 @@ pyqueue_op_iternext(struct pyqueue_op *op)
TAILQ_FOREACH(waiting, &op->queue->waiting, list) { TAILQ_FOREACH(waiting, &op->queue->waiting, list) {
if (waiting->coro == coro_running) { if (waiting->coro == coro_running) {
TAILQ_REMOVE(&op->queue->waiting, waiting, list); TAILQ_REMOVE(&op->queue->waiting, waiting, list);
Py_DECREF(waiting->coro->obj);
kore_pool_put(&queue_wait_pool, waiting); kore_pool_put(&queue_wait_pool, waiting);
break; break;
} }