#include <unistd.h>
#include <poll.h>
#include <errno.h>
+#include <limits.h>
#include <sys/socket.h>
#include "af_rxrpc.h"
#include "rxgen.h"
#define debug(fmt, ...) do { if (0) printf(fmt, ## __VA_ARGS__); } while (0)
+uint32_t rxgen_dec_padding_sink;
+
/*
* dump the control messages
*/
call->magic = RXGEN_CALL_MAGIC;
call->conn = z_conn;
- call->more = 1;
+ call->more_send = true;
+ call->more_recv = true;
call->buffer_head = buf;
call->buffer_tail = buf;
call->data_start = data;
iov[ioc].iov_base = cursor->buf + io_cursor;
iov[ioc].iov_len = end - io_cursor;
if (cursor == call->buffer_tail) {
- if (!call->more)
+ if (!call->more_send)
more = 0;
ioc++;
break;
if (call->data_count > 0)
goto more_to_send;
- if (call->data_count == 0 && !call->more)
+ if (call->data_count == 0 && !call->more_send)
call->state++;
- if (!call->more) {
+ if (!call->more_send) {
if (call->state == rx_call_cl_encoding_params ||
call->state == rx_call_sv_encoding_response)
call->state++;
msg.msg_flags = 0;
ret = recvmsg(z_conn->fd, &msg, MSG_PEEK | (nowait ? MSG_DONTWAIT : 0));
- debug("RECVMSG: %d\n", ret);
+ debug("RECVMSG PEEK: %d\n", ret);
if (ret == -1)
return -1;
if (ret == -1)
return -1;
- debug("RECV: %d [fl:%d]\n", ret, msg.msg_flags);
+ debug("RECV: %d [fl:%x]\n", ret, msg.msg_flags);
debug("CMSG: %zu\n", msg.msg_controllen);
debug("IOV: %zu [0]=%zu\n", msg.msg_iovlen, iov[0].iov_len);
rxrpc_check_call(call);
for (cursor = call->buffer_head; cursor; cursor = cursor->next)
- debug("Recv buf=%p data=%p ioc=%u\n",
+ debug("RecvQ buf=%p data=%p iocur=%u\n",
cursor, cursor->buf, cursor->io_cursor);
/* Process the metadata */
if (msg.msg_flags & MSG_EOR)
call->known_to_kernel = 0;
if (!(msg.msg_flags & MSG_MORE))
- call->more = 0;
+ call->more_recv = false;
for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
unsigned char *p;
case rx_call_cl_decoding_response:
case rx_call_sv_decoding_opcode:
case rx_call_sv_decoding_params:
- if (call->data_count < call->need_size) {
+ if (call->need_size == UINT_MAX) {
+ /* A split mode call wants everything as it comes in,
+ * and is willing to settle for nothing also.
+ */
+ if (call->data_count <= 0 && call->more_recv)
+ return 0;
+ } else if (call->data_count < call->need_size) {
if (!(msg.msg_flags & MSG_MORE)) {
/* Short data */
- call->error_code = EMSGSIZE;
+ call->error_code = ENODATA;
rxrpc_abort_call(call, 1);
}
return 0;
unsigned char control[128];
if (call->known_to_kernel) {
+ memset(control, 0, sizeof(control));
ctrllen = 0;
RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)call);
RXRPC_ADD_ABORT(control, ctrllen, abort_code);
free(cursor->buf);
free(cursor);
}
+ if (call->decoder_cleanup)
+ call->decoder_cleanup(call);
free(call);
}
}
}
-/*
- * Slow path for decoding from data read from the buffer list.
- */
-void rxrpc_enc_slow(struct rx_call *call, net_xdr_t data)
-{
- struct rx_buf *cursor, *new;
- uint8_t *buf;
-
- rxrpc_check_call(call);
-
- if (call->error_code)
- return;
- if (call->data_cursor != call->data_stop)
- abort();
- rxrpc_post_enc(call);
-
- new = calloc(1, sizeof(struct rx_buf));
- if (!new)
- goto handle_oom;
- new->buf = buf = malloc(RXGEN_BUFFER_SIZE);
- if (!buf)
- goto handle_oom_buf;
- new->magic = RXGEN_BUF_MAGIC;
-
- *(net_xdr_t *)buf = data;
-
- cursor = call->buffer_tail;
- cursor->next = new;
- call->data_cursor = call->data_start = buf + sizeof(net_xdr_t);
- call->data_stop = buf + RXGEN_BUFFER_SIZE;
- call->buffer_space += RXGEN_BUFFER_SIZE - sizeof(net_xdr_t);
- call->buffer_tail = new;
- return;
-
-handle_oom_buf:
- new->magic = RXGEN_BUF_DEAD;
- free(new);
-handle_oom:
- call->error_code = ENOMEM;
-}
-
-/*
- * Slow path for decoding from data read from the buffer list.
- */
-uint32_t rxrpc_dec_slow(struct rx_call *call)
-{
- struct rx_buf *cursor, *spent;
- net_xdr_t x;
- unsigned count;
- uint8_t *new_stop;
-
- rxrpc_check_call(call);
- rxrpc_post_dec(call);
-
- /* More data may have come into a partially filled buffer from which we
- * previously read.
- */
- cursor = call->buffer_head;
- count = cursor->io_cursor & ~3;
- new_stop = cursor->buf + count;
- if (call->data_stop >= new_stop) {
- /* This buffer must then be completely used as we're required to check
- * amount received before reading it.
- */
- if (new_stop != cursor->buf + RXGEN_BUFFER_SIZE)
- abort(); /* Didn't completely consume a buffer */
- if (call->buffer_tail == cursor)
- abort(); /* Unexpectedly out of data */
-
- /* Move to the next buffer */
- spent = cursor;
- cursor = cursor->next;
- call->buffer_head = cursor;
-
- call->data_cursor = call->data_start = cursor->buf;
- count = cursor->io_cursor & ~3;
- if (count == 0)
- abort();
- new_stop = cursor->buf + count;
- spent->magic = RXGEN_BUF_DEAD;
- free(spent->buf);
- free(spent);
- }
-
- call->data_stop = new_stop;
- x = *(net_xdr_t *)call->data_cursor;
- call->data_cursor += sizeof(x);
- return ntohl(x);
-}
-
/*
* Discard excess received data
*/
/*
* Encode a string of bytes
*/
-int rxrpc_enc_bytes(struct rx_call *call, const void *data, size_t size)
+int rxrpc_enc_blob(struct rx_call *call, const void *data, size_t size)
{
struct rx_buf *cursor, *new;
uint8_t *buf;
}
/*
- * Decode a string of bytes
+ * Slow path for decoding from data read from the buffer list.
*/
-void rxrpc_dec_bytes(struct rx_call *call)
+void rxrpc_enc_slow(struct rx_call *call, net_xdr_t data)
{
- struct rx_buf *cursor, *spent;
- unsigned needed, count;
- uint8_t *new_stop;
+ rxrpc_enc_blob(call, &data, sizeof(data));
+}
+
+/*
+ * Decode a blob.
+ *
+ * This works progressively and so may need to be called multiple times for any
+ * particular set.
+ *
+ * call->blob_size indicates how many bytes we need to extract in total and
+ * call->blob_offset indicates how many we have done so far. blob_offset is
+ * updated before returning.
+ */
+void rxrpc_dec_blob(struct rx_call *call)
+{
+ unsigned needed, segment;
rxrpc_check_call(call);
rxrpc_post_dec(call);
- needed = call->bulk_count - call->bulk_index;
- count = call->data_stop - call->data_cursor;
- if (count > 0) {
- if (count > needed)
- count = needed;
- memcpy(call->bulk_item + call->bulk_index, call->data_cursor, count);
- call->bulk_index += count;
- call->data_cursor += count;
- if (call->bulk_index >= call->bulk_count)
+ needed = call->blob_size - call->blob_offset;
+ segment = call->data_stop - call->data_cursor;
+ debug("DEC BLOB dc=%u bsize=%u seg=%u\n", call->data_count, needed, segment);
+
+ if (segment > 0) {
+ if (segment > needed)
+ segment = needed;
+
+ debug("DEC BLOB copy %u\n", segment);
+ memcpy(call->blob + call->blob_offset, call->data_cursor, segment);
+ call->blob_decoded += segment;
+ call->blob_offset += segment;
+ call->data_cursor += segment;
+ if (call->blob_offset >= call->blob_size)
return;
}
+ rxrpc_dec_advance_buffer(call);
+}
+
+/*
+ * Progress the buffer chain during decoding.
+ */
+void rxrpc_dec_advance_buffer(struct rx_call *call)
+{
+ struct rx_buf *cursor, *spent;
+ unsigned segment;
+ uint8_t *new_stop;
+
/* More data may have come into a partially filled buffer from which we
* previously read.
*/
cursor = call->buffer_head;
- count = cursor->io_cursor & ~3;
- new_stop = cursor->buf + count;
+ segment = cursor->io_cursor;
+ new_stop = cursor->buf + segment;
if (call->data_stop >= new_stop) {
/* This buffer must then be completely used as we're required to check
* amount received before reading it.
abort(); /* Unexpectedly out of data */
/* Move to the next buffer */
+ rxrpc_post_dec(call);
spent = cursor;
cursor = cursor->next;
call->buffer_head = cursor;
call->data_cursor = call->data_start = cursor->buf;
- count = cursor->io_cursor & ~3;
- if (count == 0)
+ segment = cursor->io_cursor;
+ if (segment == 0)
abort();
- new_stop = cursor->buf + count;
+ new_stop = cursor->buf + segment;
spent->magic = RXGEN_BUF_DEAD;
free(spent->buf);
call->data_stop = new_stop;
rxrpc_check_call(call);
}
+
+/*
+ * Slow path for decoding a 4-byte integer read from the buffer list.
+ */
+uint32_t rxrpc_dec_slow(struct rx_call *call)
+{
+ net_xdr_t x;
+ uint32_t ret;
+
+ debug("DEC SLOW %lu %ld\n",
+ (unsigned long)call->data_cursor & 3,
+ call->data_cursor - call->data_stop);
+
+ call->blob_size = 4;
+ call->blob_offset = 0;
+ call->blob = &x;
+
+ /* We should only come here if there is sufficient data in the
+ * buffers for us to complete all the copy operation(s).
+ */
+ if (call->data_count < call->blob_size)
+ abort();
+
+ while (call->blob_offset < call->blob_size)
+ rxrpc_dec_blob(call);
+
+ ret = ntohl(x);
+ debug("DEC SLOW = %x\n", ret);
+ return ret;
+}
#include "afs_py.h"
#include "rxgen.h"
+#define debug(fmt, ...) do { if (0) printf(fmt, ## __VA_ARGS__); } while (0)
+
+struct py_dec_index {
+ unsigned long val;
+ void *ptr;
+};
+
+struct py_dec_manager {
+ Py_buffer view;
+ struct py_dec_index top;
+ struct py_dec_index indices[];
+};
+
PyObject *py_rxgen_get_struct(const void *p, PyObject **cache,
PyObject *(*data_to_type)(const void *elem))
{
return 0;
}
-/*
- * Decode a string of bytes into a preallocated unicode string python object.
- */
-void py_dec_string(struct rx_call *call)
-{
- PyObject *str = call->bulk_item;
- struct rx_buf *cursor, *spent;
- unsigned needed, count;
- uint8_t *new_stop;
-
- if (PyUnicode_KIND(str) != PyUnicode_1BYTE_KIND)
- abort();
-
- rxrpc_post_dec(call);
-
- needed = call->bulk_count - call->bulk_index;
- count = call->data_stop - call->data_cursor;
- if (count > 0) {
- if (count > needed)
- count = needed;
- memcpy(PyUnicode_DATA(str) + call->bulk_index, call->data_cursor, count);
- call->bulk_index += count;
- call->data_cursor += count;
- rxrpc_dec_align(call);
- if (call->bulk_count <= call->bulk_index)
- return;
-
- }
-
- /* More data may have come into a partially filled buffer from which we
- * previously read.
- */
- cursor = call->buffer_head;
- count = cursor->io_cursor & ~3;
- new_stop = cursor->buf + count;
- if (call->data_stop >= new_stop) {
- /* This buffer must then be completely used as we're required to check
- * amount received before reading it.
- */
- if (new_stop != cursor->buf + RXGEN_BUFFER_SIZE)
- abort(); /* Didn't completely consume a buffer */
- if (call->buffer_tail == cursor)
- abort(); /* Unexpectedly out of data */
-
- /* Move to the next buffer */
- spent = cursor;
- cursor = cursor->next;
- call->buffer_head = cursor;
-
- call->data_cursor = call->data_start = cursor->buf;
- count = cursor->io_cursor & ~3;
- if (count == 0)
- abort();
- new_stop = cursor->buf + count;
-
- free(spent->buf);
- free(spent);
- }
-
- call->data_stop = new_stop;
-}
-
/*
* Recursively encode a standard C array.
*/
static int py_enc_c_array(struct rx_call *call,
const void *data, int dim,
const Py_ssize_t *dim_size,
- Py_ssize_t itemsize,
- int align)
+ Py_ssize_t itemsize)
{
if (dim == 1) {
/* Data subarray */
- rxrpc_enc_bytes(call, data, *dim_size * itemsize);
- if (align)
- rxrpc_enc_align(call);
+ rxrpc_enc_blob(call, data, *dim_size * itemsize);
return rxrpc_post_enc(call);
} else {
/* Pointer subarray */
const void *const *ptrs = data;
Py_ssize_t i;
for (i = 0; i < *dim_size; i++)
- if (py_enc_c_array(call, ptrs[i], dim - 1, dim_size + 1, itemsize,
- align) < 0)
+ if (py_enc_c_array(call, ptrs[i], dim - 1, dim_size + 1, itemsize) < 0)
return -1;
return 0;
}
if (dim == 0)
/* Single data item */
- return rxrpc_enc_bytes(call, data, itemsize);
+ return rxrpc_enc_blob(call, data, itemsize);
for (i = 0; i < *dim_size; i++) {
if (py_enc_numpy_array(call, data + i * *dim_stride,
if (dim == 0)
/* Single data item */
- return rxrpc_enc_bytes(call, data, itemsize);
+ return rxrpc_enc_blob(call, data, itemsize);
for (i = 0; i < *dim_size; i++) {
const void *ptr = data + i * *dim_stride;
* Encode the just the contents of a python buffer view, without length or
* realignment.
*/
-int py_enc_buffer_raw(struct rx_call *call, Py_buffer *view, size_t dim, int align)
+int py_enc_buffer_raw(struct rx_call *call, Py_buffer *view, size_t dim)
{
if (call->error_code)
return -1;
if (view->ndim == 0 || (view->ndim == 1 && !view->shape)) {
/* Simple scalar array */
- if (rxrpc_enc_bytes(call, view->buf, view->len) < 0)
+ if (rxrpc_enc_blob(call, view->buf, view->len) < 0)
return -1;
return rxrpc_post_enc(call);
}
if (!view->strides)
/* Standard C array */
- return py_enc_c_array(call, view->buf, view->ndim, view->shape, view->itemsize,
- align);
+ return py_enc_c_array(call, view->buf, view->ndim, view->shape, view->itemsize);
if (!view->suboffsets)
return py_enc_numpy_array(call, view->buf, view->ndim, view->shape, view->strides,
*/
int py_enc_buffer(struct rx_call *call, Py_buffer *view, size_t dim)
{
+ static unsigned zero;
int i;
if (call->error_code)
return -1;
if (0) {
- printf("PEBUF: l=%zu isz=%zd {", view->len, view->itemsize);
+ debug("PEBUF: l=%zu isz=%zd {", view->len, view->itemsize);
if (view->shape)
for (i = 0; i < view->ndim; i++)
- printf(" %zd", view->shape[i]);
- printf(" }\n");
+ debug(" %zd", view->shape[i]);
+ debug(" }\n");
}
if (view->len > dim) {
}
rxrpc_enc(call, view->len);
- if (py_enc_buffer_raw(call, view, dim, true) == -1)
+ if (py_enc_buffer_raw(call, view, dim) == -1)
return -1;
- rxrpc_enc_align(call);
+ if (view->len & 3)
+ rxrpc_enc_blob(call, &zero, 4 - (view->len & 3));
return rxrpc_post_enc(call);
}
/*
- * Recursively decode a standard C array.
+ * Recursively decode into a standard C array.
*/
-static int py_dec_c_array(struct rx_call *call,
- void *data, int dim,
- const Py_ssize_t *dim_size,
- Py_ssize_t itemsize)
+static int py_dec_c_array(struct rx_call *call, struct py_dec_manager *manager)
{
- if (dim == 1) {
- /* Data subarray */
- call->bulk_item = data;
- call->bulk_count = *dim_size * itemsize;
- call->bulk_index = 0;
- rxrpc_dec_bytes(call);
- return rxrpc_post_dec(call);
- } else {
- /* Pointer subarray */
- void **ptrs = data;
- Py_ssize_t i;
- for (i = 0; i < *dim_size; i++)
- if (py_dec_c_array(call, ptrs[i], dim - 1, dim_size + 1, itemsize) < 0)
- return -1;
- return 0;
+ unsigned long val;
+ int i;
+
+ /* First of all, we finish off the blob we're currently decoding
+ * into.
+ */
+ if (call->blob_size < manager->view.itemsize) {
+ rxrpc_dec_blob(call);
+ if (rxrpc_post_dec(call) < 0)
+ return -1;
+ if (call->blob_size < manager->view.itemsize)
+ goto need_more;
+ }
+
+ /* Now we increment the per-dimension counters, overflowing and
+ * wrapping when a counter reaches the limit. When the outermost
+ * wraps, we are done.
+ */
+ for (i = manager->view.ndim - 1; i >= 0; i--) {
+ val = manager->indices[i].val;
+ if (val + 1 < manager->view.shape[i]) {
+ manager->indices[i].val = val + 1;
+ goto incremented;
+ }
+ manager->indices[i].val = 0;
}
+
+ return 0; /* Complete */
+
+incremented:
+ /* Recalculate the cached pointers */
+ for (; i <= manager->view.ndim - 1; i++) {
+ void **ptrs = manager->indices[i - 1].ptr;
+ void *ptr = ptrs[manager->indices[i].val];
+ manager->indices[i].ptr = ptr;
+ }
+
+ call->blob_offset = 0;
+ call->blob = manager->indices[manager->view.ndim - 1].ptr;
+need_more:
+ return 1;
}
/*
- * Recursively decode a NumPy-style array.
+ * Recursively decode into a NumPy-style array.
*/
-static int py_dec_numpy_array(struct rx_call *call,
- void *data, int dim,
- const Py_ssize_t *dim_size,
- const Py_ssize_t *dim_stride,
- Py_ssize_t itemsize)
+static int py_dec_numpy_array(struct rx_call *call, struct py_dec_manager *manager)
{
+ unsigned long val;
int i;
- if (dim == 0) {
- /* Single data item */
- call->bulk_item = data;
- call->bulk_count = itemsize;
- call->bulk_index = 0;
- rxrpc_dec_bytes(call);
- return 0;
+ /* First of all, we finish off the blob we're currently decoding
+ * into.
+ */
+ if (call->blob_size < manager->view.itemsize) {
+ rxrpc_dec_blob(call);
+ if (rxrpc_post_dec(call) < 0)
+ return -1;
+ if (call->blob_size < manager->view.itemsize)
+ goto need_more;
}
- for (i = 0; i < *dim_size; i++) {
- if (py_dec_numpy_array(call, data + i * *dim_stride,
- dim - 1, dim_size + 1, dim_stride + 1, itemsize) < 0)
- return -1;
- if (dim == 1 && rxrpc_post_dec(call) < 0)
- return -1;
+ /* Now we increment the per-dimension counters, overflowing and
+ * wrapping when a counter reaches the limit. When the outermost
+ * wraps, we are done.
+ */
+ for (i = manager->view.ndim - 1; i >= 0; i--) {
+ val = manager->indices[i].val;
+ if (val + 1 < manager->view.shape[i]) {
+ manager->indices[i].val = val + 1;
+ goto incremented;
+ }
+ manager->indices[i].val = 0;
}
- return 0;
+
+ return 0; /* Complete */
+
+incremented:
+ /* Recalculate the cached pointers */
+ for (; i <= manager->view.ndim - 1; i++) {
+ void *ptr = manager->indices[i - 1].ptr;
+ ptr += manager->indices[i].val * manager->view.strides[i];
+ manager->indices[i].ptr = ptr;
+ }
+
+ call->blob_offset = 0;
+ call->blob = manager->indices[manager->view.ndim - 1].ptr;
+need_more:
+ return 1;
}
/*
- * Recursively decode a Python Imaging Library (PIL)-style array.
+ * Recursively decode into a Python Imaging Library (PIL)-style array.
*/
-static int py_dec_pil_array(struct rx_call *call,
- void *data, int dim,
- const Py_ssize_t *dim_size,
- const Py_ssize_t *dim_stride,
- const Py_ssize_t *dim_suboffset,
- Py_ssize_t itemsize)
+static int py_dec_pil_array(struct rx_call *call, struct py_dec_manager *manager)
{
+ unsigned long val;
int i;
- if (dim == 0) {
- /* Single data item */
- call->bulk_item = data;
- call->bulk_count = itemsize;
- call->bulk_index = 0;
- rxrpc_dec_bytes(call);
- return 0;
+ /* First of all, we finish off the blob we're currently decoding
+ * into.
+ */
+ if (call->blob_size < manager->view.itemsize) {
+ rxrpc_dec_blob(call);
+ if (rxrpc_post_dec(call) < 0)
+ return -1;
+ if (call->blob_size < manager->view.itemsize)
+ goto need_more;
}
- for (i = 0; i < *dim_size; i++) {
- void *ptr = data + i * *dim_stride;
- if (*dim_suboffset >= 0) {
+ /* Now we increment the per-dimension counters, overflowing and
+ * wrapping when a counter reaches the limit. When the outermost
+ * wraps, we are done.
+ */
+ for (i = manager->view.ndim - 1; i >= 0; i--) {
+ val = manager->indices[i].val;
+ if (val + 1 < manager->view.shape[i]) {
+ manager->indices[i].val = val + 1;
+ goto incremented;
+ }
+ manager->indices[i].val = 0;
+ }
+
+ return 0; /* Complete */
+
+incremented:
+ /* Recalculate the cached pointers */
+ for (; i <= manager->view.ndim - 1; i++) {
+ void *ptr = manager->indices[i - 1].ptr;
+ ptr += manager->indices[i].val * manager->view.strides[i];
+ if (manager->view.suboffsets[i] >= 0) {
ptr = *((void *const *)ptr);
- ptr += *dim_suboffset;
+ ptr += manager->view.suboffsets[i];
}
- if (py_dec_pil_array(call, ptr,
- dim - 1, dim_size + 1, dim_stride + 1, dim_suboffset + 1,
- itemsize) < 0)
- return -1;
- if (dim == 1 && rxrpc_post_dec(call) < 0)
- return -1;
+ manager->indices[i].ptr = ptr;
}
- return 0;
+
+ call->blob_offset = 0;
+ call->blob = manager->indices[manager->view.ndim - 1].ptr;
+need_more:
+ return 1;
}
/*
* Decode the contents of an opaque type
*/
-int py_dec_opaque(struct rx_call *call, PyObject *obj)
+int py_dec_into_buffer(struct rx_call *call)
{
- Py_buffer view;
- int i;
+ struct py_dec_manager *manager = call->decoder_manager;
+ int ret;
if (call->error_code)
return -1;
- if (PyObject_GetBuffer(obj, &view, PyBUF_FULL) < 0)
- return -1;
+ if (call->blob == &rxgen_dec_padding_sink) {
+ rxrpc_dec_blob(call);
+ if (rxrpc_post_dec(call) == -1)
+ return -1;
+ return (call->blob_offset == call->blob_size) ? 0 : 1;
+ }
- if (0) {
- printf("PEBUF: l=%zu isz=%zd {", view.len, view.itemsize);
- if (view.shape)
- for (i = 0; i < view.ndim; i++)
- printf(" %zd", view.shape[i]);
- printf(" }\n");
+ if (manager->view.ndim == 0 || (manager->view.ndim == 1 && !manager->view.shape)) {
+ /* Single scalar */
+ rxrpc_dec_blob(call);
+ if (rxrpc_post_dec(call) == -1)
+ return -1;
+ ret = (call->blob_offset == call->blob_size) ? 0 : 1;
+ } else if (!manager->view.strides)
+ ret = py_dec_c_array(call, manager);
+ else if (!manager->view.suboffsets)
+ ret = py_dec_numpy_array(call, manager);
+ else
+ ret = py_dec_pil_array(call, manager);
+
+ if (ret <= 0) {
+ PyBuffer_Release(&manager->view);
+ free(call->decoder_manager);
+ call->decoder_manager = NULL;
}
- if (view.ndim == 0 || (view.ndim == 1 && !view.shape)) {
- /* Simple scalar array */
- call->bulk_item = view.buf;
- call->bulk_count = view.len;
- call->bulk_index = 0;
- rxrpc_dec_bytes(call);
- if (rxrpc_post_dec(call) < 0)
- goto error;
- goto done;
+ if (ret == 0 && call->padding_size > 0) {
+ /* Soak up the padding to a 32-bit boundary */
+ call->blob = &rxgen_dec_padding_sink;
+ call->blob_size = 4 - (call->blob_size & 3);
+ call->blob_offset = 0;
+ return 1;
+ }
+
+ return ret;
+}
+
+/*
+ * Initialise buffer decode.
+ *
+ * Returns 0 if there's nothing to do, 1 if the decode is set up and -1 on error,
+ */
+int py_dec_init_buffer(struct rx_call *call, Py_buffer *view, bool padded)
+{
+ struct py_dec_manager *manager = NULL;
+ size_t size;
+ int i;
+
+ if (call->decoder_manager) {
+ manager = call->decoder_manager;
+ PyBuffer_Release(&manager->view);
+ free(manager);
+ manager = NULL;
+ call->decoder_manager = NULL;
+ }
+
+ debug("INIT_BUFFER: l=%zu isz=%zd nd=%d\n", view->len, view->itemsize, view->ndim);
+ call->need_size = view->len;
+ if (view->len == 0) {
+ PyBuffer_Release(view);
+ return 0;
+ }
+
+ call->padding_size = padded ? 4 - (view->len & 3) : 0;
+
+ size = sizeof(struct py_dec_manager);
+ size += view->ndim * sizeof(struct py_dec_index);
+ manager = malloc(size);
+ if (!manager) {
+ PyBuffer_Release(view);
+ PyErr_NoMemory();
+ return -1;
}
- if (!view.strides) {
+ memset(manager, 0, size);
+ memcpy(&manager->view, view, sizeof(*view));
+ manager->indices[-1].ptr = manager->view.buf;
+
+ call->blob_size = manager->view.itemsize;
+
+ if (manager->view.ndim == 0) {
+ call->blob_size = manager->view.len;
+ } else if (manager->view.ndim == 1 && !manager->view.shape) {
+ manager->indices[0].ptr = manager->indices[-1].ptr;
+ call->blob_size = manager->view.len;
+ } else if (!manager->view.strides) {
/* Standard C array */
- if (py_dec_c_array(call, view.buf, view.ndim, view.shape, view.itemsize) < 0)
- goto error;
- goto done;
+ for (i = 0; i <= manager->view.ndim - 1; i++) {
+ void **ptrs = manager->indices[i - 1].ptr;
+ void *ptr = ptrs[manager->indices[i].val];
+ manager->indices[i].ptr = ptr;
+ }
+ } else if (!manager->view.suboffsets) {
+ /* NumPy-style Python array */
+ for (i = 0; i <= manager->view.ndim - 1; i++) {
+ void *ptr = manager->indices[i - 1].ptr;
+ ptr += manager->indices[i].val * manager->view.strides[i];
+ manager->indices[i].ptr = ptr;
+ }
+ } else {
+ /* PIL-style Python array */
+ for (i = 0; i <= manager->view.ndim - 1; i++) {
+ void *ptr = manager->indices[i - 1].ptr;
+ ptr += manager->indices[i].val * manager->view.strides[i];
+ if (manager->view.suboffsets[i] >= 0) {
+ ptr = *((void *const *)ptr);
+ ptr += manager->view.suboffsets[i];
+ }
+ manager->indices[i].ptr = ptr;
+ }
}
- if (!view.suboffsets) {
- if (py_dec_numpy_array(call, view.buf, view.ndim, view.shape, view.strides,
- view.itemsize) < 0)
- goto error;
- goto done;
+ /* Set the first blob to be decoded */
+ call->blob = manager->indices[manager->view.ndim - 1].ptr;
+ call->blob_offset = 0;
+ call->decoder_manager = manager;
+ return 1;
+}
+
+/*
+ * Initialise the decoding of the contents of an opaque type
+ */
+int py_dec_init_opaque(struct rx_call *call, PyObject *obj)
+{
+ Py_buffer view;
+ int ret;
+
+ if (call->error_code)
+ return -1;
+
+ if (PyObject_GetBuffer(obj, &view, PyBUF_FULL) < 0) {
+ debug("*** GET BUFFER FAILED\n");
+ return -1;
}
- /* PIL-style Python array */
- if (py_dec_pil_array(call, view.buf, view.ndim, view.shape, view.strides,
- view.suboffsets, view.itemsize) < 0)
- goto error;
+ ret = py_dec_init_buffer(call, &view, true);
+ if (ret == -1)
+ return -1;
-done:
- PyBuffer_Release(&view);
- return call->error_code ? -1 : 0;
+ return (ret < 0 || call->error_code) ? -1 : 0;
+}
-error:
- PyBuffer_Release(&view);
- return -1;
+/*
+ * Decode a string of bytes into a preallocated unicode string python object.
+ */
+int py_dec_into_string(struct rx_call *call)
+{
+ PyObject *str = call->blob;
+ unsigned needed, segment, i;
+
+ rxrpc_post_dec(call);
+
+ needed = call->blob_size - call->blob_offset;
+ segment = call->data_stop - call->data_cursor;
+ debug("DEC STR dc=%u bsize=%u seg=%u\n", call->data_count, needed, segment);
+
+ if (segment > 0) {
+ if (segment > needed)
+ segment = needed;
+ if (call->blob != &rxgen_dec_padding_sink) {
+ for (i = 0; i < segment; i++)
+ PyUnicode_WRITE(PyUnicode_KIND(str), PyUnicode_DATA(str),
+ call->blob_offset + i, call->data_cursor[i]);
+ }
+ call->blob_decoded += segment;
+ call->blob_offset += segment;
+ call->data_cursor += segment;
+ if (call->blob_size <= call->blob_offset) {
+ if (call->blob != &rxgen_dec_padding_sink && call->padding_size > 0) {
+ /* Soak up the padding to a 32-bit boundary */
+ call->blob = &rxgen_dec_padding_sink;
+ call->blob_size = call->padding_size;
+ call->blob_offset = 0;
+ return 1;
+ }
+ return 0;
+ }
+ }
+
+ rxrpc_dec_advance_buffer(call);
+ return 1;
+}
+
+/*
+ * Decode a string of bytes into a preallocated unicode string python object.
+ */
+int py_dec_init_string(struct rx_call *call, PyObject **_str)
+{
+ PyObject *str;
+
+ debug("INIT STRING %u\n", call->blob_size);
+
+ str = PyUnicode_New(call->blob_size, 255);
+ if (!str)
+ return -1;
+ *_str = str;
+ if (call->blob_size == 0)
+ return 0;
+
+ if (PyUnicode_READY(str) < 0) {
+ debug("*** STRING NON-CANONICAL\n");
+ abort();
+ }
+
+ call->blob = str;
+ call->blob_offset = 0;
+ call->padding_size = 4 - (call->blob_size & 3);
+ return 1;
}
+/*
+ * Comparator for binary searching the abort code table
+ */
static int py_rxgen_received_abort_cmp(const void *key, const void *_entry)
{
const struct kafs_abort_list *entry = _entry;
else
return PyErr_Format(ex, "Aborted %u", call->abort_code);
}
+
+/*
+ * Clean up a call after decoding
+ */
+void py_rxgen_decoder_cleanup(struct rx_call *call)
+{
+ if (call->decoder_manager) {
+ struct py_dec_manager *manager = call->decoder_manager;
+ PyBuffer_Release(&manager->view);
+ free(manager);
+ }
+ Py_XDECREF(call->decoder_split_callback);
+ Py_XDECREF(call->decoder_split_info);
+}
#ifndef _PY_RXGEN_H
#define _PY_RXGEN_H
+#include "rxgen.h"
+
struct py_rx_connection {
PyObject_HEAD
struct rx_connection *x;
split_dead
};
-struct py_rx_split_call {
+struct py_rx_split_info {
PyObject_HEAD
struct rx_call *call;
- enum py_rx_split_state state;
+ PyObject *target;
+ enum py_rx_split_state split_state;
+ unsigned state;
+ bool receiving_data;
};
extern PyTypeObject py_rx_connectionType;
extern PyObject *kafs_py_string_to_key(PyObject *, PyObject *);
extern int py_rxgen_initialise_members(PyObject *obj, PyObject *kw);
+extern void py_rxgen_decoder_cleanup(struct rx_call *call);
/*
* Single embedded struct handling
*/
extern PyObject *py_rxgen_get_string(const void *_p, size_t n);
extern int py_rxgen_set_string(void *_p, size_t n, PyObject *val);
-extern int py_enc_buffer_raw(struct rx_call *call, Py_buffer *view, size_t dim, int align);
+extern int py_enc_buffer_raw(struct rx_call *call, Py_buffer *view, size_t dim);
extern int py_enc_buffer(struct rx_call *call, Py_buffer *view, size_t dim);
-extern void py_dec_string(struct rx_call *call);
-extern int py_dec_opaque(struct rx_call *call, PyObject *obj);
+extern int py_dec_into_buffer(struct rx_call *call);
+extern int py_dec_init_buffer(struct rx_call *call, Py_buffer *view, bool padded);
+extern int py_dec_init_opaque(struct rx_call *call, PyObject *obj);
+extern int py_dec_into_string(struct rx_call *call);
+extern int py_dec_init_string(struct rx_call *call, PyObject **_str);
/*
* Embedded general array handling
/*
* Split-mode RPC call handling
*/
-extern PyTypeObject py_rx_split_callType;
+extern PyTypeObject py_rx_split_infoType;
extern PyObject *py_rxgen_split_client_prepare(void);
-extern int py_rxgen_split_client_transmit(PyObject *split, PyObject *split_call);
-extern int py_rxgen_split_client_receive(PyObject *split, PyObject *split_call);
+extern int py_rxgen_split_transmit(struct rx_call *call);
+extern int py_rxgen_split_receive(struct rx_call *call, bool init);
-static inline void py_rxgen_split_client_set(PyObject *_split_call, struct rx_call *call)
+static inline void py_rxgen_split_client_set(struct rx_call *call,
+ PyObject *split_callback,
+ PyObject *_split_info)
{
- struct py_rx_split_call *split_call = (struct py_rx_split_call *)_split_call;
- split_call->call = call;
+ struct py_rx_split_info *split_info = (struct py_rx_split_info *)_split_info;
+
+ split_info->call = call;
+ Py_INCREF(split_callback);
+ call->decoder_split_callback = split_callback;
+ call->decoder_split_info = _split_info;
}
#endif /* _PY_RXGEN_H */
#include "afs_py.h"
#include "rxgen.h"
+#define debug(fmt, ...) do { if (0) printf(fmt, ## __VA_ARGS__); } while (0)
+
+static int py_rx_split_do_recv(struct rx_call *call,
+ struct py_rx_split_info *split_info)
+{
+ struct pollfd fds[1];
+
+ fds[0].fd = call->conn->fd;
+ fds[0].events = POLLIN;
+ fds[0].revents = 0;
+ poll(fds, 1, 0);
+ if (fds[0].revents & POLLIN)
+ rxrpc_recv_data(call->conn, true);
+ return 0;
+}
+
+static int py_rx_split_do_send_recv(struct rx_call *call,
+ struct py_rx_split_info *split_info,
+ bool more)
+{
+ if (!more) {
+ call->more_send = 0;
+ split_info->split_state = split_idle;
+ }
+ if (rxrpc_send_data(call) == -1)
+ return -1;
+
+ return py_rx_split_do_recv(call, split_info);
+}
+
/*
- * Send an RPC call: split.send(data, more)
+ * Send an RPC call: split_info.send(data, more)
*/
static PyObject *py_rx_split_send(PyObject *_self, PyObject *args)
{
- struct py_rx_split_call *self = (struct py_rx_split_call *)_self;
- struct rx_call *call = self->call;
- struct pollfd fds[1];
+ struct py_rx_split_info *split_info = (struct py_rx_split_info *)_self;
+ struct rx_call *call = split_info->call;
Py_buffer data;
- int more;
+ int more = 0;
if (!call)
abort();
- if (self->state != split_transmitting)
+ if (split_info->split_state != split_transmitting)
abort();
if (!PyArg_ParseTuple(args, "y*p", &data, &more))
return NULL;
- if (py_enc_buffer_raw(call, &data, UINT_MAX, false) == -1)
+ if (py_enc_buffer_raw(call, &data, UINT_MAX) == -1)
return PyErr_SetFromErrno(PyExc_OSError);
- if (!more) {
- call->more = 0;
- self->state = split_idle;
- }
- if (rxrpc_send_data(call) == -1)
+ if (py_rx_split_do_send_recv(call, split_info, more) < 0)
return PyErr_SetFromErrno(PyExc_OSError);
- fds[0].fd = call->conn->fd;
- fds[0].events = POLLIN;
- fds[0].revents = 0;
- poll(fds, 1, 0);
- if (fds[0].revents & POLLIN)
- rxrpc_recv_data(call->conn, true);
+ Py_RETURN_NONE;
+}
+
+/*
+ * Request reception of some data, where the amount of data is potentially
+ * zero. This can only be used if there is no non-split data to be received
+ * after the split data as this commits the caller to handling all the
+ * remaining data in this phase.
+ *
+ * The caller should use split_info.data_available() to work out how much data
+ * is available, if any.
+ */
+static PyObject *py_rx_split_will_recv_all(PyObject *_self, PyObject *args)
+{
+ struct py_rx_split_info *split_info = (struct py_rx_split_info *)_self;
+ struct rx_call *call = split_info->call;
+ call->need_size = UINT_MAX;
Py_RETURN_NONE;
}
/*
- * Recv an RPC call: v = split.recv(n)
+ * Receive a fixed-size object: ret = split_info.begin_recv(buffer)
+ *
+ * The object's intrinsic size is used to determine how much data we want for
+ * it. Potentially 0-length, variable sized data cannot be handled this way.
+ *
+ * Returns NULL on error; True on success (ie. may be more to receive).
*/
-static PyObject *py_rx_split_recv(PyObject *_self, PyObject *args)
+static PyObject *py_rx_split_begin_recv(PyObject *_self, PyObject *args)
{
- struct py_rx_split_call *self = (struct py_rx_split_call *)_self;
- struct rx_call *call = self->call;
+ struct py_rx_split_info *split_info = (struct py_rx_split_info *)_self;
+ struct rx_call *call = split_info->call;
+ Py_buffer buffer;
+ int ret, padded = 1;
- if (self->state != split_receiving)
+ if (!call)
+ abort();
+ if (split_info->split_state != split_receiving)
abort();
- abort();
- return NULL;
+ if (!PyArg_ParseTuple(args, "w*|p", &buffer, &padded))
+ return NULL;
+
+ ret = py_dec_init_buffer(call, &buffer, padded);
+ switch (ret) {
+ case -1: return NULL;
+ case 0: split_info->state++; Py_RETURN_TRUE;
+ case 1: split_info->receiving_data = true; Py_RETURN_TRUE;
+ default: abort();
+ }
+}
+
+/*
+ * Query how much data is in the Rx buffers of an RPC call: n = split_info.data_available()
+ */
+static PyObject *py_rx_split_data_available(PyObject *_self, PyObject *args)
+{
+ struct py_rx_split_info *split_info = (struct py_rx_split_info *)_self;
+ struct rx_call *call = split_info->call;
+
+ switch (call->state) {
+ case rx_call_cl_decoding_response:
+ case rx_call_sv_decoding_opcode:
+ case rx_call_sv_decoding_params:
+ break;
+ default:
+ /* Aborted or some other error */
+ return NULL;
+ }
+
+ debug("DAVAIL: %u %d\n", call->data_count, call->more_recv);
+
+ if (call->data_count > 0 || call->more_recv)
+ return PyLong_FromUnsignedLong(call->data_count);
+ else
+ Py_RETURN_NONE;
}
/*
* Methods applicable to split-call objects
*/
-static PyMethodDef py_rx_split_methods[] = {
+static PyMethodDef py_rx_split_info_methods[] = {
{"send", (PyCFunction)py_rx_split_send, METH_VARARGS,
"" },
- {"recv", (PyCFunction)py_rx_split_recv, METH_VARARGS,
+ {"will_recv_all", (PyCFunction)py_rx_split_will_recv_all, METH_NOARGS,
"" },
+ {"begin_recv", (PyCFunction)py_rx_split_begin_recv, METH_VARARGS,
+ "" },
+ {"data_available", (PyCFunction)py_rx_split_data_available, METH_NOARGS,
+ "" },
+ {}
+};
+
+static PyMemberDef py_rx_split_info_members[] = {
+ { "state", T_UINT, offsetof(struct py_rx_split_info, state), 0, ""},
+ { "target", T_OBJECT, offsetof(struct py_rx_split_info, target), 0, ""},
{}
};
static int
py_rx_split_init(PyObject *_self, PyObject *args, PyObject *kwds)
{
- struct py_rx_split_call *self = (struct py_rx_split_call *)_self;
+ struct py_rx_split_info *self = (struct py_rx_split_info *)_self;
self->call = NULL;
+ self->target = NULL;
+ self->split_state = split_idle;
+ self->state = 0;
+ self->receiving_data = false;
return 0;
}
static void
-py_rx_split_dealloc(struct py_rx_split_call *self)
+py_rx_split_dealloc(struct py_rx_split_info *self)
{
- assert(self->state != split_dead);
- self->state = split_dead;
+ assert(self->split_state != split_dead);
+ self->split_state = split_dead;
self->call = NULL;
+ Py_XDECREF(self->target);
Py_TYPE(self)->tp_free((PyObject *)self);
}
-PyTypeObject py_rx_split_callType = {
+PyTypeObject py_rx_split_infoType = {
PyVarObject_HEAD_INIT(NULL, 0)
- "kafs.rx_split_call", /*tp_name*/
- sizeof(struct py_rx_split_call), /*tp_basicsize*/
+ "kafs.rx_split_info", /*tp_name*/
+ sizeof(struct py_rx_split_info), /*tp_basicsize*/
0, /*tp_itemsize*/
(destructor)py_rx_split_dealloc, /*tp_dealloc*/
0, /*tp_print*/
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
- py_rx_split_methods, /* tp_methods */
- 0, /* tp_members */
+ py_rx_split_info_methods, /* tp_methods */
+ py_rx_split_info_members, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
};
/*
- * Check a split-RPC object has transmit and receive functions.
+ * Allocate an information object.
*/
PyObject *py_rxgen_split_client_prepare(void)
{
- struct py_rx_split_call *obj;
+ struct py_rx_split_info *obj;
- obj = (struct py_rx_split_call *)_PyObject_New(&py_rx_split_callType);
+ obj = (struct py_rx_split_info *)_PyObject_New(&py_rx_split_infoType);
if (!obj)
return PyErr_NoMemory();
py_rx_split_init((PyObject *)obj, NULL, NULL);
/*
* Perform split-RPC transmission.
*/
-int py_rxgen_split_client_transmit(PyObject *split, PyObject *_split_call)
+int py_rxgen_split_transmit(struct rx_call *call)
{
- struct py_rx_split_call *split_call = (struct py_rx_split_call *)_split_call;
+ struct py_rx_split_info *split_info = call->decoder_split_info;
+ PyObject *callback = call->decoder_split_callback;
PyObject *result;
int ret;
- assert(split_call->state == split_idle);
+ assert(split_info->split_state == split_idle);
- split_call->state = split_transmitting;
- result = PyObject_CallMethod(split, "client_transmit", "O", _split_call);
- if (result) {
+ split_info->split_state = split_transmitting;
+ result = PyObject_CallMethod(callback, "transmit",
+ "O", call->decoder_split_info);
+ if (result == Py_None) {
+ ret = py_rx_split_do_send_recv(call, split_info, false);
+ } else if (result) {
ret = PyObject_IsTrue(result) ? 0 : -1;
Py_DECREF(result);
} else {
ret = -1;
}
- if (ret == 0 && split_call->state != split_idle)
+ if (ret == 0 && split_info->split_state != split_idle)
abort();
return ret;
}
/*
* Perform split-RPC reception.
*/
-int py_rxgen_split_client_receive(PyObject *split, PyObject *_split_call)
+int py_rxgen_split_receive(struct rx_call *call, bool init)
{
- struct py_rx_split_call *split_call = (struct py_rx_split_call *)_split_call;
+ struct py_rx_split_info *split_info = call->decoder_split_info;
+ PyObject *callback = call->decoder_split_callback;
PyObject *result;
int ret;
- assert(split_call->state == split_idle);
+ assert(split_info->split_state == split_idle);
- split_call->state = split_receiving;
- result = PyObject_CallMethod(split, "client_receive", "O", _split_call);
- split_call->state = split_idle;
- assert(result != NULL);
+ debug("-->%s(%u,%u,%u)\n", __func__, init, split_info->state, split_info->receiving_data);
- if (result) {
- ret = PyObject_IsTrue(result) ? 0 : -1;
- Py_DECREF(result);
+ if (init)
+ split_info->state = 0;
+
+again:
+ if (split_info->receiving_data) {
+ switch (py_dec_into_buffer(call)) {
+ case -1:
+ result = PyObject_CallMethod(callback, "receive_failed",
+ "O", call->decoder_split_info);
+ Py_XDECREF(result);
+ return -1;
+ case 1: /* More data needed */
+ return 1;
+ case 0:
+ split_info->receiving_data = false;
+ split_info->state++;
+ break;
+ }
+ }
+
+ split_info->split_state = split_receiving;
+ result = PyObject_CallMethod(callback, "receive",
+ "O", call->decoder_split_info);
+ split_info->split_state = split_idle;
+
+ if (!result)
+ return -1;
+
+ if (result == Py_None) {
+ /* Split receive phase not used */
+ call->need_size = 0;
+ ret = 0;
+ } else if (result == Py_False) {
+ /* Split receive phase complete */
+ ret = 0;
+ } else if (result == Py_True) {
+ /* Wanting to receive data or reenter in a new state */
+ if (split_info->receiving_data)
+ goto again;
+ ret = 1;
} else {
+ PyErr_Format(PyExc_TypeError,
+ "Expected True, False or None return from split receive function");
ret = -1;
}
+ Py_DECREF(result);
return ret;
}
#include "af_rxrpc.h"
#include <stdbool.h>
#include <errno.h>
+#include <stdlib.h>
typedef uint32_t net_xdr_t;
enum rx_call_state state;
unsigned known_to_kernel : 1;
unsigned secured : 1;
- unsigned more : 1;
+ unsigned more_send : 1;
+ unsigned more_recv : 1;
int error_code;
uint32_t abort_code;
unsigned need_size;
- unsigned long long bytes_sent, bytes_received;
+ unsigned long long bytes_sent, bytes_received, blob_decoded;
/* Service routines */
void (*processor)(struct rx_call *call);
struct rx_buf *buffer_tail;
unsigned data_count;
unsigned buffer_space;
+ unsigned padding_size;
/* Decoding support */
unsigned phase; /* Encode/decode phase */
+ unsigned blob_size; /* Size of blob being encoded/decoded */
+ unsigned blob_offset; /* Offset into blob */
unsigned bulk_count; /* Number of items in bulk array */
unsigned bulk_index; /* Index of item being processed */
+ void *blob; /* Blob being encoded/decoded */
union {
void *bulk_item; /* Pointer to string/bytes/struct being processed */
uint32_t bulk_u32; /* 8/16/32-bit integer being processed */
};
int (*decoder)(struct rx_call *call);
void *decoder_private;
+ void *decoder_manager;
+ void *decoder_split_callback;
+ void *decoder_split_info;
+ void (*decoder_cleanup)(struct rx_call *call);
};
+extern uint32_t rxgen_dec_padding_sink;
-extern int rxrpc_enc_bytes(struct rx_call *call, const void *data, size_t size);
+extern int rxrpc_enc_blob(struct rx_call *call, const void *data, size_t size);
extern void rxrpc_enc_slow(struct rx_call *call, net_xdr_t data);
static inline void rxrpc_enc(struct rx_call *call, uint32_t data)
{
uint32_t xdr_data = htonl(data);
- if (__builtin_expect(call->data_cursor < call->data_stop, 1)) {
+ if (__builtin_expect(((unsigned long)call->data_cursor & 3) == 0 &&
+ call->data_cursor < call->data_stop, 1)) {
*(net_xdr_t *)call->data_cursor = xdr_data;
call->data_cursor += 4;
} else {
static inline void rxrpc_enc_align(struct rx_call *call)
{
+ abort(); // Can't assume data_cursor is 4-byte aligned
while ((unsigned long)call->data_cursor & 3)
*(call->data_cursor++) = 0;
}
}
}
-extern void rxrpc_dec_bytes(struct rx_call *call);
+extern void rxrpc_dec_blob(struct rx_call *call);
extern uint32_t rxrpc_dec_slow(struct rx_call *call);
static inline uint32_t rxrpc_dec(struct rx_call *call)
{
- if (__builtin_expect(call->data_cursor < call->data_stop, 1)) {
+ if (__builtin_expect(((unsigned long)call->data_cursor & 3) == 0 &&
+ call->data_cursor < call->data_stop, 1)) {
net_xdr_t x = *(net_xdr_t *)call->data_cursor;
call->data_cursor += sizeof(x);
return ntohl(x);
static inline void rxrpc_dec_align(struct rx_call *call)
{
unsigned long cursor = (unsigned long)call->data_cursor;
+ abort(); // Can't assume data_cursor is 4-byte aligned
cursor = (cursor + 3) & ~3UL;
call->data_cursor = (uint8_t*)cursor;
}
size_t n = call->data_cursor - call->data_start;
call->data_start = call->data_cursor;
call->data_count -= n;
+ call->need_size -= n;
return 0;
} else {
errno = call->error_code;
}
}
+extern void rxrpc_dec_advance_buffer(struct rx_call *call);
extern int rxgen_dec_discard_excess(struct rx_call *call);
extern struct rx_connection *rx_new_connection(const struct sockaddr *sa,
if ($p->{class} eq "array") {
die $p->{where}, ": Array arg not supported";
- } elsif ($p->{class} eq "bulk" &&
- !($p->{elem}->{class} eq "string" ||
- $p->{elem}->{class} eq "opaque")
- ) {
+ } elsif ($p->{class} eq "bulk") {
# Encode
if ($p->{elem}->{class} eq "struct") {
$proto = "int (*get__" . $p->{name} . ")(struct rx_call *call, void *token)";
push @declines, "void *token__" . $p->{name};
push @declines, "size_t nr__" . $p->{name};
push @args, "token__" . $p->{name};
- } elsif ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" ||
- $p->{elem}->{class} eq "opaque")) {
+ } elsif ($p->{class} eq "blob") {
$proto = $p->{type} . " " . $p->{ptr} . $p->{name};
push @enclines, "size_t nr__" . $p->{name};
push @enclines, "const " . $proto;
) {
$phase->{size} += $p->{xdr_size};
push @{$phase->{params}}, $p;
+ } elsif ($p->{class} eq "blob") {
+ $have_bulk = 2;
+
+ # Bulk objects begin with an element count
+ $phase->{elem_count} = $phase->{size};
+ $phase->{size} += 4;
+
+ my %pseudoparam = (
+ class => "basic",
+ type => "blob_size",
+ name => $p->{name},
+ elem => $p->{elem},
+ where => $p->{where},
+ xdr_size => $p->{xdr_size},
+ );
+ push @{$phase->{params}}, \%pseudoparam;
+
+ # Create a new phase
+ $phase = {
+ type => "blob",
+ name => $p->{name},
+ elem => $p->{elem},
+ params => [ $p ],
+ xdr_size => $p->{xdr_size},
+ size => $p->{xdr_size},
+ };
+ push @phases, $phase;
+
+ $phase->{size} = 4;
+ $phase = 0;
} elsif ($p->{class} eq "bulk") {
$have_bulk = 1 if ($have_bulk == 0);
- if ($p->{elem}->{class} eq "string" ||
- $p->{elem}->{class} eq "opaque") {
- $have_bulk = 2;
- }
# Bulk objects begin with an element count
$phase->{elem_count} = $phase->{size};
};
push @phases, $phase;
- if ($p->{elem}->{class} eq "string" ||
- $p->{elem}->{class} eq "opaque") {
- $phase->{size} = 4;
- } else {
- # We don't want to be sending one object at a time if they're
- # really small.
- my $n_buf = ($p->{xdr_size} < 1020) ? int(1020 / $p->{xdr_size}) : 1;
- $n_buf *= $p->{xdr_size};
- $phase->{size} = $p->{xdr_size};
- }
+ # We don't want to be sending one object at a time if they're
+ # really small.
+ my $n_buf = ($p->{xdr_size} < 1020) ? int(1020 / $p->{xdr_size}) : 1;
+ $n_buf *= $p->{xdr_size};
+ $phase->{size} = $p->{xdr_size};
$phase = 0;
} else {
die $p->{where}, "Encode array arg not supported";
unless (@params) {
die if ($side eq "client");
- print RXOUT "\tcall->more = 0;\n";
+ print RXOUT "\tcall->more_send = 0;\n";
print RXOUT "\treturn 0;\n";
print RXOUT "}\n";
return;
print RXOUT "\tswitch (phase) {\n";
print RXOUT "\tcase 0:\n";
- print RXOUT "\t\tcall->more = 1;\n";
+ print RXOUT "\t\tcall->more_send = 1;\n";
my $phase_goto_label = 0;
my $phix;
}
# Determine how big bulk objects are
- if ($phase->{type} eq "bulk") {
+ if ($phase->{type} eq "blob") {
+ my $p = $phase->{params}->[0];
+ print RXOUT "\t\tcall->blob_size = ", $ptr, "nr__", $p->{name}, ";\n";
+ print RXOUT "\t\tif (call->blob_size == 0)\n";
+ print RXOUT "\t\t\tgoto phase_", $phix + 1, ";\n";
+ $phase_goto_label = $phix + 1;
+ print RXOUT "\t\tcall->blob_offset = 0;\n";
+ } elsif ($phase->{type} eq "bulk") {
my $p = $phase->{params}->[0];
print RXOUT "\t\tcall->bulk_count = ", $ptr, "nr__", $p->{name}, ";\n";
print RXOUT "\t\tif (call->bulk_count == 0)\n";
# Marshal the data
foreach my $p (@{$phase->{params}}) {
- if ($p->{type} eq "bulk_size") {
+ if ($p->{type} eq "blob_size") {
+ print RXOUT "\t\trxrpc_enc(call, ", $ptr, "nr__", $p->{name}, ");\n";
+ $close_phase = 0;
+ next;
+ } elsif ($p->{type} eq "bulk_size") {
print RXOUT "\t\trxrpc_enc(call, ", $ptr, "nr__", $p->{name}, ");\n";
$close_phase = 0;
next;
die $p->{where}, ": Unsupported type in decode";
}
- if ($p->{class} eq "bulk") {
+ if ($p->{class} eq "blob") {
+ print RXOUT "\t\tif (call->blob_offset < call->blob_size) {\n";
+ print RXOUT "\t\t\tphase = ", $phix, ";\n";
+ print RXOUT "\t\t\tgoto select_phase;\n";
+ print RXOUT "\t\t}\n";
+ } elsif ($p->{class} eq "bulk") {
print RXOUT "\t\tif (call->bulk_index < call->bulk_count) {\n";
print RXOUT "\t\t\tphase = ", $phix, ";\n";
print RXOUT "\t\t\tgoto select_phase;\n";
print RXOUT "\tcase ", $phix, ":\n";
print RXOUT "\t\tif (rxrpc_post_enc(call) < 0)\n";
print RXOUT "\t\t\treturn -1;\n";
- print RXOUT "\t\tcall->more = 0;\n";
+ print RXOUT "\t\tcall->more_send = 0;\n";
print RXOUT "\t\tbreak;\n";
print RXOUT "\t}\n";
) {
$phase->{size} += $p->{xdr_size};
push @{$phase->{params}}, $p;
+ } elsif ($p->{class} eq "blob") {
+ $have_bulk = 1;
+
+ # Bulk objects begin with an element count
+ $phase->{elem_count} = $phase->{size};
+ $phase->{size} += 4;
+
+ my %pseudoparam = (
+ class => "basic",
+ type => "blob_size",
+ name => $p->{name},
+ elem => $p->{elem},
+ where => $p->{where},
+ xdr_size => $p->{xdr_size},
+ );
+ push @{$phase->{params}}, \%pseudoparam;
+
+ # Create a new phase
+ $phase = {
+ type => "blob",
+ name => $p->{name},
+ params => [ $p ],
+ size => 4,
+ xdr_size => $p->{xdr_size},
+ };
+ push @phases, $phase;
+ $phase = 0;
} elsif ($p->{class} eq "bulk") {
$have_bulk = 1;
};
push @phases, $phase;
- if ($p->{elem}->{class} eq "string" ||
- $p->{elem}->{class} eq "opaque") {
- $phase->{bytearray} = 1;
- }
-
# We don't want to be asking recvmsg() for one object at a time if
# they're really small.
my $n_buf = ($p->{xdr_size} < 1020) ? int(1020 / $p->{xdr_size}) : 1;
}
# Determine how big bulk objects are
- if ($phase->{type} eq "bulk") {
+ if ($phase->{type} eq "blob") {
+ my $p = $phase->{params}->[0];
+ print RXOUT "\t\tcall->blob_size = ", $ptr, "nr__", $p->{name}, ";\n";
+ print RXOUT "\t\tcall->blob_offset = UINT_MAX;\n";
+ print RXOUT "\t\tif (", $ptr, "alloc__", $p->{name}, "(call, &", $ptr, "token__", $p->{name}, ") < 0)\n";
+ print RXOUT "\t\t\treturn -1;\n";
+ print RXOUT "\t\tif (call->blob_size == 0)\n";
+ print RXOUT "\t\t\tgoto phase_", $phix + 1, ";\n";
+ $phase_goto_label = $phix + 1;
+ print RXOUT "\t\tcall->blob_offset = 0;\n";
+ } elsif ($phase->{type} eq "bulk") {
my $p = $phase->{params}->[0];
print RXOUT "\t\tcall->bulk_count = ", $ptr, "nr__", $p->{name}, ";\n";
print RXOUT "\t\tcall->bulk_index = UINT_MAX;\n";
if ($p->{elem}->{class} eq "basic") {
print RXOUT "\t\tif (", $ptr, "store__", $p->{name}, "(call, &", $ptr, "token__", $p->{name}, ") < 0)\n";
- } elsif ($p->{elem}->{class} eq "string" ||
- $p->{elem}->{class} eq "opaque") {
- print RXOUT "\t\tif (", $ptr, "alloc__", $p->{name}, "(call, &", $ptr, "token__", $p->{name}, ") < 0)\n";
} else {
print RXOUT "\t\tif (", $ptr, "alloc__", $p->{name}, "(call, &", $ptr, "token__", $p->{name}, ") < 0)\n";
}
print RXOUT "\tcase ", $phix, ":\n";
print RXOUT "\t\tif (count < ", $phase->{size}, ")";
- if ($phase->{type} eq "bulk" && !exists($phase->{bytearray}) &&
- $phase->{xdr_size} <= 512) {
+ if ($phase->{type} eq "bulk" &&
+ $phase->{xdr_size} <= 512
+ ) {
print RXOUT " {\n";
print RXOUT "\t\t\tunsigned n = call->bulk_count - call->bulk_index;\n";
print RXOUT "\t\t\tn = MIN(n, ", int(1024 / $phase->{xdr_size}), ");\n";
# Unmarshal the data
print RXOUT "\n";
foreach my $p (@{$phase->{params}}) {
- if ($p->{type} eq "bulk_size") {
+ if ($p->{type} eq "blob_size") {
+ print RXOUT "\t\t", $ptr, "nr__", $p->{name}, " = rxrpc_dec(call);\n";
+ next;
+ } elsif ($p->{type} eq "bulk_size") {
print RXOUT "\t\t", $ptr, "nr__", $p->{name}, " = rxrpc_dec(call);\n";
next;
}
print RXOUT "\t\t\treturn -1;\n";
print RXOUT "\t\trxgen_decode_", $p->{type}, "(call, call->bulk_item);\n";
print RXOUT "\t\tcall->bulk_index++;\n";
- } elsif ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" ||
- $p->{elem}->{class} eq "opaque")) {
- print RXOUT "\t\trxrpc_dec_bytes(call);\n";
+ } elsif ($p->{class} eq "blob") {
+ print RXOUT "\t\trxrpc_dec_blob(call);\n";
print RXOUT "\t\trxrpc_dec_align(call);\n";
} elsif ($p->{class} eq "basic" && $p->{xdr_size} == 4) {
print RXOUT "\t\t", $ptr, $p->{name}, " = rxrpc_dec(call);\n";
die $p->{where}, ": Unsupported type in decode";
}
- if ($p->{class} eq "bulk") {
+ if ($p->{class} eq "blob") {
+ print RXOUT "\t\tif (rxrpc_post_dec(call) < 0)\n";
+ print RXOUT "\t\t\treturn -1;\n";
+ print RXOUT "\t\tif (call->blob_offset < call->blob_size) {\n";
+ print RXOUT "\t\t\tphase = ", $phix, ";\n";
+ print RXOUT "\t\t\tgoto select_phase;\n";
+ print RXOUT "\t\t}\n";
+ } elsif ($p->{class} eq "bulk") {
print RXOUT "\t\tif (rxrpc_post_dec(call) < 0)\n";
print RXOUT "\t\t\treturn -1;\n";
print RXOUT "\t\tif (call->bulk_index < call->bulk_count) {\n";
}
}
- if ($phase->{type} ne "bulk") {
+ if ($phase->{type} ne "blob" || $phase->{type} ne "bulk") {
print RXOUT "\t\tif (rxrpc_post_dec(call) < 0)\n";
print RXOUT "\t\t\treturn -1;\n";
}
print RXOUT "\tstruct rx_call *call;\n" if ($what eq "request");
- my @all_bulk_params = grep { $_->{class} eq "bulk"; } @{$params};
-
- my @bulk_params = grep { ($_->{elem}->{class} ne "string" &&
- $_->{elem}->{class} ne "opaque"); } @all_bulk_params;
+ my @blob_params = grep { $_->{class} eq "blob"; } @{$params};
+ my @bulk_params = grep { $_->{class} eq "bulk"; } @{$params};
# Local variables
print RXOUT "\tint ret;\n";
# Check lengths
- if (@all_bulk_params) {
+ if (@blob_params || @bulk_params) {
print RXOUT "\n";
print RXOUT "\tif (";
my $first = 1;
- foreach my $p (@all_bulk_params) {
+ foreach my $p (@blob_params) {
if ($first) {
$first = 0;
} else {
print RXOUT " ||\n\t ";
}
- if ($p->{elem}->{class} eq "string" ||
- $p->{elem}->{class} eq "opaque") {
- print RXOUT "!", $p->{name};
+ print RXOUT "!", $p->{name};
+ if (exists($p->{dim})) {
+ print RXOUT " || nr__", $p->{name}, " > ", $p->{dim};
+ }
+ }
+ foreach my $p (@bulk_params) {
+ if ($first) {
+ $first = 0;
} else {
- print RXOUT "!get__", $p->{name};
+ print RXOUT " ||\n\t ";
}
+ print RXOUT "!get__", $p->{name};
if (exists($p->{dim})) {
print RXOUT " || nr__", $p->{name}, " > ", $p->{dim};
}
print RXOUT "\trxrpc_enc(call, (uint32_t)(", $p->{name}, " >> 32));\n";
} elsif ($p->{class} eq "struct") {
print RXOUT "\trxgen_encode_", $p->{type}, "(call, ", $p->{name}, ");\n";
- } elsif ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" ||
- $p->{elem}->{class} eq "opaque")) {
- print RXOUT "\trxrpc_enc_bytes(call, ", $p->{name}, ", nr__", $p->{name}, ");\n";
+ } elsif ($p->{class} eq "blob") {
+ print RXOUT "\trxrpc_enc_blob(call, ", $p->{name}, ", nr__", $p->{name}, ");\n";
print RXOUT "\trxrpc_enc_align(call);\n";
+ } elsif ($p->{class} eq "blob") {
+ print RXOUT "\trxrpc_enc(call, nr__", $p->{name}, ");\n";
+ print RXOUT "\tcall->blob_size = nr__", $p->{name}, ";\n";
+ print RXOUT "\tfor (call->blob_offset = 0; call->blob_offset < call->blob_size; call->blob_offset++) {\n";
+ if ($p->{elem}->{class} eq "struct") {
+ print RXOUT "\t\tstruct ", $p->{elem}->{type}, " x;\n";
+ } else {
+ print RXOUT "\t\t", $p->{elem}->{type}, " x;\n";
+ }
+ print RXOUT "\t\tcall->blob = &x;\n";
+ print RXOUT "\t\tif (get__", $p->{name}, "(call, token__", $p->{name}, ") < 0)\n";
+ print RXOUT "\t\t\tgoto error;\n";
+ die $p->{where}, "No decoding for array type '$type'";
+ print RXOUT "\t}\n";
} elsif ($p->{class} eq "bulk") {
print RXOUT "\trxrpc_enc(call, nr__", $p->{name}, ");\n";
print RXOUT "\tcall->bulk_count = nr__", $p->{name}, ";\n";
print RXOUT "\tif (rxrpc_post_enc(call) < 0)\n";
print RXOUT "\t\tgoto error;\n";
- print RXOUT "\tcall->more = 0;\n";
+ print RXOUT "\tcall->more_send = 0;\n";
# Send the message
print RXOUT "\n";
if (@py_type_defs) {
print PYOUT "\tif (";
print PYOUT "PyType_Ready(&py_rx_connectionType) < 0 ||\n\t ";
- print PYOUT "PyType_Ready(&py_rx_split_callType) < 0";
+ print PYOUT "PyType_Ready(&py_rx_split_infoType) < 0";
my $first = 0;
foreach my $def (@py_type_defs) {
print PYOUT " ||\n\t " unless ($first);
push @complex, $p;
print PYHDR "\t\tPyObject\t*", $p->{name}, ";\n";
}
- $have_opaque = 1 if ($p->{class} eq "bulk" && $p->{elem}->{class} eq "opaque");
+ $have_opaque = 1 if ($p->{class} eq "blob" && $p->{elem}->{class} eq "opaque");
}
print PYHDR "\t} x;\n";
print PYHDR "\tPy_buffer dec_buf;\n" if ($have_opaque);
print PYOUT "static PyMemberDef ", $struct_req, "_members[] = {\n";
foreach my $p (@{$params}) {
print PYOUT "\t{ \"", $p->{name}, "\", ";
- if ($p->{class} eq "bulk") { print PYOUT "T_OBJECT_EX";
+ if ($p->{class} eq "blob") { print PYOUT "T_OBJECT_EX";
+ } elsif ($p->{class} eq "bulk") { print PYOUT "T_OBJECT_EX";
} elsif ($p->{type} eq "char" ) { print PYOUT "T_CHAR";
} elsif ($p->{type} eq "int8_t" ) { print PYOUT "T_BYTE";
} elsif ($p->{type} eq "int16_t" ) { print PYOUT "T_SHORT";
foreach my $p (@{$func->{params}}) {
next if ($p->{class} ne "bulk");
- next if ($p->{elem}->{class} eq "string" || $p->{elem}->{class} eq "opaque");
# Data encoding
if (!exists $bulk_get_helpers{$p->{type}}) {
print PYOUT "\tstruct py_rx_connection *z_conn;\n";
print PYOUT "\tstruct py_", $func->{name}, "_response *response;\n";
foreach my $p (@{$func->{request}}) {
- if ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" ||
- $p->{elem}->{class} eq "opaque")) {
+ if ($p->{class} eq "blob") {
print PYOUT "\tPy_buffer param_", $p->{name}, ";\n";
} elsif ($p->{class} eq "basic") {
print PYOUT "\t", $p->{type}, " param_", $p->{name}, ";\n";
}
}
- print PYOUT "\tPyObject *split, *split_call;\n" if ($func->{split});
+ print PYOUT "\tPyObject *split_callback, *split_info;\n" if ($func->{split});
print PYOUT "\tPyObject *res = NULL;\n";
print PYOUT "\tint ret;\n";
} elsif ($p->{type} eq "uint32_t") { print PYOUT "I";
} elsif ($p->{type} eq "uint64_t") { print PYOUT "K";
} elsif ($p->{class} eq "struct") { print PYOUT "O!";
- } elsif ($p->{class} eq "bulk" && $p->{elem}->{class} eq "string") {
+ } elsif ($p->{class} eq "blob" && $p->{elem}->{class} eq "string") {
print PYOUT "s*";
- } elsif ($p->{class} eq "bulk" && $p->{elem}->{class} eq "opaque") {
+ } elsif ($p->{class} eq "blob" && $p->{elem}->{class} eq "opaque") {
print PYOUT "z*";
- } elsif ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "basic" ||
- $p->{elem}->{class} eq "struct")) {
+ } elsif ($p->{class} eq "bulk") {
print PYOUT "O!";
} else {
die $p->{where}, ": No py parse for param";
foreach my $p (@{$func->{request}}) {
print PYOUT ",\n";
print PYOUT "\t\t\t ";
- if ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" ||
- $p->{elem}->{class} eq "opaque")) {
- print PYOUT "¶m_", $p->{name};
- } elsif ($p->{class} eq "basic") {
+ if ($p->{class} eq "basic") {
print PYOUT "¶m_", $p->{name};
} elsif ($p->{class} eq "struct") {
print PYOUT "&py_", $p->{type}, "Type, ¶m_", $p->{name};
+ } elsif ($p->{class} eq "blob") {
+ print PYOUT "¶m_", $p->{name};
} elsif ($p->{class} eq "bulk") {
print PYOUT "&PyList_Type, ¶m_", $p->{name};
} else {
die $p->{where}, ": Unsupported type \"", $p->{type}, "\"";
}
}
- print PYOUT ",\n\t\t\t &split" if ($func->{split});
+ print PYOUT ",\n\t\t\t &split_callback" if ($func->{split});
print PYOUT "))\n";
print PYOUT "\t\treturn NULL;\n";
if ($func->{split}) {
print PYOUT "\n";
- print PYOUT "\tsplit_call = py_rxgen_split_client_prepare();\n";
- print PYOUT "\tif (!split_call)\n";
+ print PYOUT "\tsplit_info = py_rxgen_split_client_prepare();\n";
+ print PYOUT "\tif (!split_info)\n";
print PYOUT "\t\treturn NULL;\n";
}
print PYOUT "\n";
print PYOUT "\tcall = rxrpc_alloc_call(z_conn->x, 0);\n";
print PYOUT "\tif (!call) {\n";
- print PYOUT "\t\tPy_XDECREF(split_call);\n" if ($func->{split});
+ print PYOUT "\t\tPy_XDECREF(split_info);\n" if ($func->{split});
print PYOUT "\t\treturn PyErr_NoMemory();\n";
print PYOUT "\t}\n";
- print PYOUT "\tpy_rxgen_split_client_set(split_call, call);\n" if ($func->{split});
+ print PYOUT "\tcall->decoder_cleanup = py_rxgen_decoder_cleanup;\n";
+ print PYOUT "\tpy_rxgen_split_client_set(call, split_callback, split_info);\n"
+ if ($func->{split});
# Marshal the arguments
print PYOUT "\n";
print PYOUT "\trxrpc_enc(call, ", $func->{opcode}, ");\n";
foreach my $p (@{$func->{request}}) {
- if ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "basic" ||
- $p->{elem}->{class} eq "struct")) {
- print PYOUT "\tif (py_encode_bulk_", $p->{type}, "(call, param_", $p->{name}, ") < 0)\n";
- print PYOUT "\t\tgoto error;\n";
- } elsif ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" ||
- $p->{elem}->{class} eq "opaque")) {
+ if ($p->{class} eq "blob") {
my $dim = -1;
$dim = $p->{dim} if exists $p->{dim};
print PYOUT "\tif (py_enc_buffer(call, ¶m_", $p->{name}, ", ", $dim, ") < 0) {\n";
print PYOUT "\t\trxrpc_terminate_call(call, EINVAL);\n";
- print PYOUT "\t\tPy_XDECREF(split_call);\n" if ($func->{split});
print PYOUT "\t\treturn NULL;\n";
print PYOUT "\t}\n";
+ } elsif ($p->{class} eq "bulk") {
+ print PYOUT "\tif (py_encode_bulk_", $p->{type}, "(call, param_", $p->{name}, ") < 0)\n";
+ print PYOUT "\t\tgoto error;\n";
} elsif ($p->{class} eq "basic" && $p->{xdr_size} == 4) {
print PYOUT "\trxrpc_enc(call, param_", $p->{name}, ");\n";
} elsif ($p->{class} eq "basic" && $p->{xdr_size} == 8) {
} elsif ($p->{class} eq "struct") {
print PYOUT "\tif (py_premarshal_", $p->{type}, "((PyObject *)param_", $p->{name}, ")) {\n";
print PYOUT "\t\trxrpc_terminate_call(call, EINVAL);\n";
- print PYOUT "\t\tPy_XDECREF(split_call);\n" if ($func->{split});
print PYOUT "\t\treturn NULL;\n";
print PYOUT "\t}\n";
print PYOUT "\trxgen_encode_", $p->{type}, "(call, ¶m_", $p->{name}, "->x);\n";
# Transmit the split data
if ($func->{split}) {
- print PYOUT "\tif (py_rxgen_split_client_transmit(split, split_call) < 0)\n";
+ print PYOUT "\tif (py_rxgen_split_transmit(call) < 0)\n";
print PYOUT "\t\tgoto error_no_res;\n";
} else {
- print PYOUT "\tcall->more = 0;\n";
+ print PYOUT "\tcall->more_send = 0;\n";
# Make the call
print PYOUT "\n";
# Successful return
print PYOUT "\n";
print PYOUT "\trxrpc_terminate_call(call, 0);\n";
- print PYOUT "\tPy_XDECREF(split_call);\n" if ($func->{split});
print PYOUT "\treturn res;\n";
# Error cleanups
# in the previous phase. Variable-sized bulk arrays are split across
# multiple phases, with the length being at the end of the first phase and
# the data in the second.
+ #
+ # We also need to interpolate a phase to deal with decoding split-op
+ # auxiliary data. This comes last when decoding the request and first when
+ # decoding the response.
+ #
my @phases = ();
my $phase = 0;
my $have_bulk = 0;
my $want_item = 0;
+ if ($func->{split} && $subname eq "response") {
+ $phase = {
+ type => "split",
+ size => "py_rxgen_split_receive(call)",
+ params => []
+ };
+ push @phases, $phase;
+ $phase = 0;
+ $have_bulk = 1;
+ }
+
foreach my $p (@params) {
unless ($phase) {
$phase = { type => "flat", size => 0, params => [] };
) {
$phase->{size} += $p->{xdr_size};
push @{$phase->{params}}, $p;
+ } elsif ($p->{class} eq "blob") {
+ $have_bulk = 1;
+
+ # Bulk objects begin with an element count
+ $phase->{elem_count} = $phase->{size};
+ $phase->{size} += 4;
+
+ my %pseudoparam = (
+ class => "basic",
+ type => "blob_size",
+ name => $p->{name},
+ elem => $p->{elem},
+ where => $p->{where},
+ xdr_size => $p->{xdr_size},
+ );
+ push @{$phase->{params}}, \%pseudoparam;
+
+ # Create a new phase
+ $phase = {
+ type => "blob",
+ name => $p->{name},
+ params => [ $p ],
+ xdr_size => $p->{xdr_size},
+ };
+ push @phases, $phase;
+
+ # We don't want to be asking recvmsg() for one object at a time if
+ # they're really small.
+ $phase->{size} = $p->{xdr_size};
+ $phase = 0;
} elsif ($p->{class} eq "bulk") {
$have_bulk = 1;
};
push @phases, $phase;
- if ($p->{elem}->{class} eq "string" ||
- $p->{elem}->{class} eq "opaque") {
- ;
- } else {
- $want_item = 1;
- }
+ $want_item = 1;
# We don't want to be asking recvmsg() for one object at a time if
# they're really small.
}
}
+ if ($func->{split} && $subname eq "request") {
+ $phase = {
+ type => "split",
+ size => "py_rxgen_split_receive(call)",
+ params => []
+ };
+ push @phases, $phase;
+ $phase = 0;
+ $have_bulk = 1;
+ }
+
# Function definition and arguments
print PYOUT "\n";
print PYOUT "int py_", $func->{name}, "_decode_", $subname, "(struct rx_call *call)\n";
print PYOUT "{\n";
- unless (@params) {
+ unless (@params || $func->{split}) {
print PYOUT "\treturn 0;\n";
print PYOUT "}\n";
return;
}
# Local variables
- print PYOUT "\tstruct py_", $func->{name}, "_", $subname, " *obj = call->decoder_private;\n";
+ print PYOUT "\tstruct py_", $func->{name}, "_", $subname, " *obj = call->decoder_private;\n"
+ if (@params);
print PYOUT "\tPyObject *item;\n" if ($want_item);
print PYOUT "\tunsigned phase = call->phase;\n";
print PYOUT "\tunsigned count;\n";
print PYOUT "\n";
print PYOUT "select_phase:\n" if ($have_bulk);
print PYOUT "\tcount = call->data_count;\n";
+ #print PYOUT "\tprintf(\"-- Phase %u (%u) --\\n\", phase, count);\n";
print PYOUT "\tswitch (phase) {\n";
print PYOUT "\tcase 0:\n";
}
# Determine how big bulk objects are
- if ($phase->{type} eq "bulk") {
+ if ($phase->{type} eq "blob") {
my $p = $phase->{params}->[0];
- if ($p->{elem}->{class} eq "basic" ||
- $p->{elem}->{class} eq "struct") {
- print PYOUT "\t\tobj->x.", $p->{name}, " = PyList_New(call->bulk_count);\n";
- print PYOUT "\t\tif (!obj->x.", $p->{name}, ")\n";
- print PYOUT "\t\t\treturn -1;\n";
- } elsif ($p->{elem}->{class} eq "string") {
- print PYOUT "\t\tobj->x.", $p->{name}, " = PyUnicode_New(call->bulk_count, 255);\n";
- print PYOUT "\t\tif (!obj->x.", $p->{name}, ")\n";
- print PYOUT "\t\t\treturn -1;\n";
- print PYOUT "\t\tcall->bulk_item = obj->x.", $p->{name}, ";\n";
+ if ($p->{elem}->{class} eq "string") {
+ print PYOUT "\t\tswitch (py_dec_init_string(call, &obj->x.", $p->{name}, ")) {\n";
} elsif ($p->{elem}->{class} eq "opaque") {
print PYOUT "\t\tobj->x.", $p->{name}, " = PyByteArray_FromStringAndSize(\"\", 0);\n";
print PYOUT "\t\tif (!obj->x.", $p->{name}, ")\n";
print PYOUT "\t\t\treturn -1;\n";
- print PYOUT "\t\tif (PyByteArray_Resize(obj->x.", $p->{name}, ", call->bulk_count) == -1)\n";
+ print PYOUT "\t\tif (PyByteArray_Resize(obj->x.", $p->{name}, ", call->blob_size) == -1)\n";
+ print PYOUT "\t\t\treturn -1;\n";
+
+ print PYOUT "\t\tswitch (py_dec_init_opaque(call, obj->x.", $p->{name}, ")) {\n";
+ } else {
+ die;
+ }
+ print PYOUT "\t\tcase -1: return -1;\n";
+ print PYOUT "\t\tcase 0: goto phase_", $phix + 1, ";\n";
+ print PYOUT "\t\tcase 1: break;\n";
+ print PYOUT "\t\t}\n";
+ $phase_goto_label = $phix + 1;
+ } elsif ($phase->{type} eq "bulk") {
+ my $p = $phase->{params}->[0];
+ if ($p->{elem}->{class} eq "basic" ||
+ $p->{elem}->{class} eq "struct") {
+ print PYOUT "\t\tobj->x.", $p->{name}, " = PyList_New(call->bulk_count);\n";
+ print PYOUT "\t\tif (!obj->x.", $p->{name}, ")\n";
print PYOUT "\t\t\treturn -1;\n";
} else {
die;
}
# Entry point for a phase
- print PYOUT "\t\tcall->need_size = ", $phase->{size}, ";\n";
+ if ($phase->{type} ne "split") {
+ print PYOUT "\t\tcall->need_size = ", $phase->{size}, ";\n";
+ } else {
+ print PYOUT "\t\tif (py_rxgen_split_receive(call, 1) < 0)\n";
+ print PYOUT "\t\t\treturn -1;\n";
+ print PYOUT "\t\tif (call->need_size == 0)\n";
+ print PYOUT "\t\t\tgoto phase_", $phix + 1, ";\n";
+ $phase_goto_label = $phix + 1;
+ }
print PYOUT "\t\tcall->phase = ", $phix, ";\n";
print PYOUT "\tcase ", $phix, ":\n";
- print PYOUT "\t\tif (count < ", $phase->{size}, ")\n";
- print PYOUT "\t\t\treturn 1;\n";
+ if ($phase->{type} ne "split") {
+ print PYOUT "\t\tif (count < call->need_size)\n";
+ print PYOUT "\t\t\treturn 1;\n";
+ } else {
+ print PYOUT "\t\tif (call->need_size == UINT_MAX ? count == 0 : count < call->need_size) {\n";
+ #print PYOUT "\t\t\tprintf(\"NEED %u (phase %u)\\n\", call->need_size, phase);\n";
+ print PYOUT "\t\t\treturn 1;\n";
+ print PYOUT "\t\t}\n";
+ }
# Unmarshal the data
print PYOUT "\n";
foreach my $p (@{$phase->{params}}) {
- if ($p->{type} eq "bulk_size") {
+ if ($p->{type} eq "blob_size") {
+ print PYOUT "\t\tcall->blob_size = rxrpc_dec(call);\n";
+ next;
+ } elsif ($p->{type} eq "bulk_size") {
print PYOUT "\t\tcall->bulk_count = rxrpc_dec(call);\n";
next;
}
- if ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "basic" ||
- $p->{elem}->{class} eq "struct")
- ) {
+ if ($p->{class} eq "bulk") {
if ($p->{elem}->{class} eq "struct") {
print PYOUT "\t\titem = py_decode_", $p->{type}, "(call);\n";
} elsif ($p->{elem}->{xdr_size} == 4 && $p->{type} =~ /^u/) {
print PYOUT "\t\t\treturn -1;\n";
print PYOUT "\t\tcall->bulk_index++;\n";
- } elsif ($p->{class} eq "bulk" && $p->{elem}->{class} eq "string") {
- print PYOUT "\t\tpy_dec_string(call);\n";
- } elsif ($p->{class} eq "bulk" && $p->{elem}->{class} eq "opaque") {
- print PYOUT "\t\tpy_dec_opaque(call, obj->x.", $p->{name}, ");\n";
+ } elsif ($p->{class} eq "blob") {
+ if ($p->{elem}->{class} eq "string") {
+ print PYOUT "\t\tswitch (py_dec_into_string(call)) {\n";
+ } else {
+ print PYOUT "\t\tswitch (py_dec_into_buffer(call)) {\n";
+ }
+ print PYOUT "\t\tcase -1: return -1;\n";
+ print PYOUT "\t\tcase 0: break;\n";
+ print PYOUT "\t\tcase 1: phase = ", $phix, "; goto select_phase;\n";
+ print PYOUT "\t\t}\n";
} elsif ($p->{class} eq "basic" && $p->{xdr_size} == 4) {
print PYOUT "\t\tobj->x.", $p->{name}, " = (", $p->{type}, ")rxrpc_dec(call);\n";
} elsif ($p->{class} eq "basic" && $p->{xdr_size} == 8) {
die $p->{where}, ": Unsupported type in decode";
}
- if ($p->{class} eq "bulk") {
+ if ($p->{class} eq "blob" && $p->{elem}->{class} eq "string") {
+ print PYOUT "\t\tif (rxrpc_post_dec(call) < 0)\n";
+ print PYOUT "\t\t\treturn -1;\n";
+ print PYOUT "\t\tif (call->blob_offset < call->blob_size) {\n";
+ print PYOUT "\t\t\tphase = ", $phix, ";\n";
+ print PYOUT "\t\t\tgoto select_phase;\n";
+ print PYOUT "\t\t}\n";
+ } elsif ($p->{class} eq "bulk") {
print PYOUT "\t\tif (rxrpc_post_dec(call) < 0)\n";
print PYOUT "\t\t\treturn -1;\n";
print PYOUT "\t\tif (call->bulk_index < call->bulk_count) {\n";
}
}
- if ($phase->{type} ne "bulk") {
+ if ($phase->{type} eq "split") {
+ print PYOUT "\t\tswitch (py_rxgen_split_receive(call, 0)) {\n";
+ print PYOUT "\t\tcase -1: return -1;\n";
+ print PYOUT "\t\tcase 0: break;\n";
+ print PYOUT "\t\tcase 1: phase = ", $phix, "; goto select_phase;\n";
+ print PYOUT "\t\t}\n";
+ #print PYOUT "\t\tif (rxrpc_post_dec(call) < 0)\n";
+ #print PYOUT "\t\t\treturn -1;\n";
+ #print PYOUT "\t\tif (call->need_size != 0) {\n";
+ #print PYOUT "\t\t\tphase = ", $phix, ";\n";
+ #print PYOUT "\t\t\tgoto select_phase;\n";
+ #print PYOUT "\t\t}\n";
+ }
+
+ if ($phase->{type} ne "bulk" && $phase->{type} ne "blob") {
print PYOUT "\t\tif (rxrpc_post_dec(call) < 0)\n";
print PYOUT "\t\t\treturn -1;\n";
}
#
# Each type is specified by a hash of the following elements:
#
-# class Complexity class (basic, string, struct, array, bulk)
+# class Complexity class (basic, string, struct, blob, array, bulk)
# type Basic/struct type (char, {u,}int{8,16,32,64}_t, opaque, struct name)
# elem Ref to element type def (if array/bulk)
# multi 1 if array or bulk type, 0 otherwise
my %combined = %{$type};
+ if (exists $flags->{class} && $flags->{class} eq "bulk") {
+ if ($type->{class} eq "string" ||
+ $type->{class} eq "opaque") {
+ $flags->{class} = "blob";
+ }
+ }
+
if (exists $flags->{class} &&
- ($flags->{class} eq "bulk" ||
+ ($flags->{class} eq "blob" ||
+ $flags->{class} eq "bulk" ||
$flags->{class} eq "array")) {
die $where, ": Typedef'ing array/bulk as array/bulk not supported\n"
if ($type->{multi});
if ($line =~ /([a-zA-Z_][a-zA-Z0-9_]*) ([a-zA-Z_][a-zA-Z0-9_]*);/) {
my %member = %{look_up_type($1)};
die $where, ": Don't support bulk constructs in structs\n"
- if ($member{class} eq "bulk");
+ if ($member{class} eq "bulk" || $member{class} eq "blob");
$member{name} = $2;
$member{where} = $where;
push $struct->{members}, \%member;
if ($bulk_dim) {
die $where, ": Bulk-of-bulk parameters not supported\n"
if ($param{class} eq "bulk");
- $param{class} = "bulk";
+ if ($type->{class} eq "string" ||
+ $type->{class} eq "opaque") {
+ $param{class} = "blob";
+ } else {
+ $param{class} = "bulk";
+ }
$param{elem} = $type;
$param{dim} = $bulk_dim unless $bulk_dim eq -1;
}
;
} elsif ($p->{class} eq "struct") {
die unless (exists $p->{xdr_size});
+ } elsif ($p->{class} eq "blob") {
+ die $p->{where}, ": No element type" unless (exists $p->{elem});
+ if (exists $p->{dim}) {
+ die $where, ": Missing constant ", $p->{dim} unless exists $constants{$p->{dim}};
+ }
} elsif ($p->{class} eq "bulk") {
die $p->{where}, ": No element type" unless (exists $p->{elem});
die $p->{where}, ": Element has no XDR size" unless (exists $p->{elem}->{xdr_size});
--- /dev/null
+#
+# AFS Server management toolkit: Server log fetcher
+# -*- coding: utf-8 -*-
+#
+
+__copyright__ = """
+Copyright (C) 2014 Red Hat, Inc. All Rights Reserved.
+Written by David Howells (dhowells@redhat.com)
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public Licence version 2 as
+published by the Free Software Foundation.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public Licence for more details.
+
+You should have received a copy of the GNU General Public Licence
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+"""
+
+from afs.exception import AFSException
+from afs.argparse import *
+from afs.lib.output import *
+import kafs
+import os
+
+help = "Print a server process's log file"
+
+command_arguments = [
+ [ "server", get_bosserver, "rs", "<machine name>" ],
+ [ "file", get_file_name, "rs", "<log file to examine>" ],
+ [ "cell", get_cell, "os", "<cell name>" ],
+ [ "noauth", get_auth, "fn" ],
+ [ "localauth", get_auth, "fn" ],
+ [ "verbose", get_verbose, "fn" ],
+ [ "encrypt", get_dummy, "fn" ],
+]
+
+cant_combine_arguments = [
+ ( "cell", "localauth" ),
+ ( "noauth", "localauth" ),
+]
+
+argument_size_limits = {
+ "file" : kafs.BOZO_BSSIZE,
+}
+
+description = r"""
+Print a server process's log file
+"""
+
+class split_handler:
+ def __init__(self):
+ pass
+
+ def transmit(self, split_info):
+ return None
+
+ # Receive state machine. Returns True if more data is required to be read
+ # or another state must be transited to and False if the operation is now
+ # complete.
+ #
+ def receive(self, split_info):
+ # We want to receive everything we can - the entire response phase
+ # belongs to the split handler
+ if split_info.state == 0:
+ split_info.will_recv_all()
+ split_info.state = 1
+ return True
+
+ if split_info.state == 1:
+ avail = split_info.data_available()
+ if avail == None:
+ # Last byte read
+ return False
+ if avail == 0:
+ split_info.will_recv_all()
+ return True
+
+ if avail > 4096:
+ avail = 4096
+ buf = bytearray(avail)
+ split_info.target = buf
+ split_info.state = 2
+
+ # Request reception start. This function will be reentered with
+ # phase incremented when it's done. receive_failed() will be
+ # called instead upon failure.
+ return split_info.begin_recv(buf, False)
+
+ if split_info.state == 3:
+ buf = split_info.target
+ split_info.target = None
+
+ # Strip any terminal NUL chars
+ buf = buf.rstrip(b"\0")
+ output_raw(buf)
+ del buf
+
+ split_info.state = 1
+ return True
+
+ raise AFSException("Unexpected receive phase ", split_info.phase,
+ " in getlog() split.receive()")
+
+ def receive_failed(self, split_info):
+ print("Receive failed in phase", split_info.phase)
+ split_info.target = None
+
+def main(params):
+ cell = params["cell"]
+ bos_conn = cell.open_bos_server(params["server"], params)
+
+ split = split_handler()
+
+ output("Fetching log file '", params["file"], "'...\n")
+ output_flush()
+ ret = kafs.BOZO_GetLog(bos_conn, params["file"], split)
self.__size = size
self.__pos = 0
- def client_transmit(self, split_call):
+ def transmit(self, split_info):
while self.__pos < self.__size:
verbose("--- XMIT ", self.__pos, "/", self.__size, "\n")
size = self.__size - self.__pos
buf = self.__fd.read(size)
if len(buf) < size:
raise IOError("Short read on file " + self.__name)
- split_call.send(buf, self.__pos < self.__size)
+ split_info.send(buf, self.__pos < self.__size)
return True
- def client_receive(self, split_call):
- return True
+ def receive(self, split_info, inited):
+ return None
def main(params):
cell = params["cell"]
for i in args:
sys.stdout.write(str(i))
+def output_raw(*args):
+ for i in args:
+ sys.stdout.buffer.write(i)
+
+def output_flush():
+ sys.stdout.flush()
+
def outputf(formatstr, *args):
sys.stdout.write(formatstr.format(*args))
except KeyboardInterrupt:
sys.exit(1)
except SystemExit:
- pass
+ sys.exit(0)
except:
print('Unhandled exception:', file=sys.stderr)
traceback.print_exc()