#include <unistd.h>
 #include <poll.h>
 #include <errno.h>
+#include <limits.h>
 #include <sys/socket.h>
 #include "af_rxrpc.h"
 #include "rxgen.h"
 
 #define debug(fmt, ...) do { if (0) printf(fmt, ## __VA_ARGS__); } while (0)
 
+uint32_t rxgen_dec_padding_sink;
+
 /*
  * dump the control messages
  */
 
        call->magic = RXGEN_CALL_MAGIC;
        call->conn = z_conn;
-       call->more = 1;
+       call->more_send = true;
+       call->more_recv = true;
        call->buffer_head = buf;
        call->buffer_tail = buf;
        call->data_start = data;
                iov[ioc].iov_base = cursor->buf + io_cursor;
                iov[ioc].iov_len = end - io_cursor;
                if (cursor == call->buffer_tail) {
-                       if (!call->more)
+                       if (!call->more_send)
                                more = 0;
                        ioc++;
                        break;
 
        if (call->data_count > 0)
                goto more_to_send;
-       if (call->data_count == 0 && !call->more)
+       if (call->data_count == 0 && !call->more_send)
                call->state++;
 
-       if (!call->more) {
+       if (!call->more_send) {
                if (call->state == rx_call_cl_encoding_params ||
                    call->state == rx_call_sv_encoding_response)
                        call->state++;
        msg.msg_flags   = 0;
 
        ret = recvmsg(z_conn->fd, &msg, MSG_PEEK | (nowait ? MSG_DONTWAIT : 0));
-       debug("RECVMSG: %d\n", ret);
+       debug("RECVMSG PEEK: %d\n", ret);
        if (ret == -1)
                return -1;
 
        if (ret == -1)
                return -1;
 
-       debug("RECV: %d [fl:%d]\n", ret, msg.msg_flags);
+       debug("RECV: %d [fl:%x]\n", ret, msg.msg_flags);
        debug("CMSG: %zu\n", msg.msg_controllen);
        debug("IOV: %zu [0]=%zu\n", msg.msg_iovlen, iov[0].iov_len);
 
        rxrpc_check_call(call);
 
        for (cursor = call->buffer_head; cursor; cursor = cursor->next)
-               debug("Recv buf=%p data=%p ioc=%u\n",
+               debug("RecvQ buf=%p data=%p iocur=%u\n",
                      cursor, cursor->buf, cursor->io_cursor);
 
        /* Process the metadata */
        if (msg.msg_flags & MSG_EOR)
                call->known_to_kernel = 0;
        if (!(msg.msg_flags & MSG_MORE))
-               call->more = 0;
+               call->more_recv = false;
 
        for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
                unsigned char *p;
        case rx_call_cl_decoding_response:
        case rx_call_sv_decoding_opcode:
        case rx_call_sv_decoding_params:
-               if (call->data_count < call->need_size) {
+               if (call->need_size == UINT_MAX) {
+                       /* A split mode call wants everything as it comes in,
+                        * and is willing to settle for nothing also.
+                        */
+                       if (call->data_count <= 0 && call->more_recv)
+                               return 0;
+               } else if (call->data_count < call->need_size) {
                        if (!(msg.msg_flags & MSG_MORE)) {
                                /* Short data */
-                               call->error_code = EMSGSIZE;
+                               call->error_code = ENODATA;
                                rxrpc_abort_call(call, 1);
                        }
                        return 0;
        unsigned char control[128];
 
        if (call->known_to_kernel) {
+               memset(control, 0, sizeof(control));
                ctrllen = 0;
                RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)call);
                RXRPC_ADD_ABORT(control, ctrllen, abort_code);
                free(cursor->buf);
                free(cursor);
        }
+       if (call->decoder_cleanup)
+               call->decoder_cleanup(call);
        free(call);
 }
 
        }
 }
 
-/*
- * Slow path for decoding from data read from the buffer list.
- */
-void rxrpc_enc_slow(struct rx_call *call, net_xdr_t data)
-{
-       struct rx_buf *cursor, *new;
-       uint8_t *buf;
-
-       rxrpc_check_call(call);
-
-       if (call->error_code)
-               return;
-       if (call->data_cursor != call->data_stop)
-               abort();
-       rxrpc_post_enc(call);
-
-       new = calloc(1, sizeof(struct rx_buf));
-       if (!new)
-               goto handle_oom;
-       new->buf = buf = malloc(RXGEN_BUFFER_SIZE);
-       if (!buf)
-               goto handle_oom_buf;
-       new->magic = RXGEN_BUF_MAGIC;
-
-       *(net_xdr_t *)buf = data;
-
-       cursor = call->buffer_tail;
-       cursor->next = new;
-       call->data_cursor = call->data_start = buf + sizeof(net_xdr_t);
-       call->data_stop = buf + RXGEN_BUFFER_SIZE;
-       call->buffer_space += RXGEN_BUFFER_SIZE - sizeof(net_xdr_t);
-       call->buffer_tail = new;
-       return;
-
-handle_oom_buf:
-       new->magic = RXGEN_BUF_DEAD;
-       free(new);
-handle_oom:
-       call->error_code = ENOMEM;
-}
-
-/*
- * Slow path for decoding from data read from the buffer list.
- */
-uint32_t rxrpc_dec_slow(struct rx_call *call)
-{
-       struct rx_buf *cursor, *spent;
-       net_xdr_t x;
-       unsigned count;
-       uint8_t *new_stop;
-
-       rxrpc_check_call(call);
-       rxrpc_post_dec(call);
-
-       /* More data may have come into a partially filled buffer from which we
-        * previously read.
-        */
-       cursor = call->buffer_head;
-       count = cursor->io_cursor & ~3;
-       new_stop = cursor->buf + count;
-       if (call->data_stop >= new_stop) {
-               /* This buffer must then be completely used as we're required to check
-                * amount received before reading it.
-                */
-               if (new_stop != cursor->buf + RXGEN_BUFFER_SIZE)
-                       abort(); /* Didn't completely consume a buffer */
-               if (call->buffer_tail == cursor)
-                       abort(); /* Unexpectedly out of data */
-
-               /* Move to the next buffer */
-               spent = cursor;
-               cursor = cursor->next;
-               call->buffer_head = cursor;
-
-               call->data_cursor = call->data_start = cursor->buf;
-               count = cursor->io_cursor & ~3;
-               if (count == 0)
-                       abort();
-               new_stop = cursor->buf + count;
-               spent->magic = RXGEN_BUF_DEAD;
-               free(spent->buf);
-               free(spent);
-       }
-
-       call->data_stop = new_stop;
-       x = *(net_xdr_t *)call->data_cursor;
-       call->data_cursor += sizeof(x);
-       return ntohl(x);
-}
-
 /*
  * Discard excess received data
  */
 /*
  * Encode a string of bytes
  */
-int rxrpc_enc_bytes(struct rx_call *call, const void *data, size_t size)
+int rxrpc_enc_blob(struct rx_call *call, const void *data, size_t size)
 {
        struct rx_buf *cursor, *new;
        uint8_t *buf;
 }
 
 /*
- * Decode a string of bytes
+ * Slow path for decoding from data read from the buffer list.
  */
-void rxrpc_dec_bytes(struct rx_call *call)
+void rxrpc_enc_slow(struct rx_call *call, net_xdr_t data)
 {
-       struct rx_buf *cursor, *spent;
-       unsigned needed, count;
-       uint8_t *new_stop;
+       rxrpc_enc_blob(call, &data, sizeof(data));
+}
+
+/*
+ * Decode a blob.
+ *
+ * This works progressively and so may need to be called multiple times for any
+ * particular set.
+ *
+ * call->blob_size indicates how many bytes we need to extract in total and
+ * call->blob_offset indicates how many we have done so far.  blob_offset is
+ * updated before returning.
+ */
+void rxrpc_dec_blob(struct rx_call *call)
+{
+       unsigned needed, segment;
 
        rxrpc_check_call(call);
        rxrpc_post_dec(call);
 
-       needed = call->bulk_count - call->bulk_index;
-       count = call->data_stop - call->data_cursor;
-       if (count > 0) {
-               if (count > needed)
-                       count = needed;
-               memcpy(call->bulk_item + call->bulk_index, call->data_cursor, count);
-               call->bulk_index += count;
-               call->data_cursor += count;
-               if (call->bulk_index >= call->bulk_count)
+       needed = call->blob_size - call->blob_offset;
+       segment = call->data_stop - call->data_cursor;
+       debug("DEC BLOB dc=%u bsize=%u seg=%u\n", call->data_count, needed, segment);
+
+       if (segment > 0) {
+               if (segment > needed)
+                       segment = needed;
+
+               debug("DEC BLOB copy %u\n", segment);
+               memcpy(call->blob + call->blob_offset, call->data_cursor, segment);
+               call->blob_decoded += segment;
+               call->blob_offset += segment;
+               call->data_cursor += segment;
+               if (call->blob_offset >= call->blob_size)
                        return;
        }
 
+       rxrpc_dec_advance_buffer(call);
+}
+
+/*
+ * Progress the buffer chain during decoding.
+ */
+void rxrpc_dec_advance_buffer(struct rx_call *call)
+{
+       struct rx_buf *cursor, *spent;
+       unsigned segment;
+       uint8_t *new_stop;
+
        /* More data may have come into a partially filled buffer from which we
         * previously read.
         */
        cursor = call->buffer_head;
-       count = cursor->io_cursor & ~3;
-       new_stop = cursor->buf + count;
+       segment = cursor->io_cursor;
+       new_stop = cursor->buf + segment;
        if (call->data_stop >= new_stop) {
                /* This buffer must then be completely used as we're required to check
                 * amount received before reading it.
                        abort(); /* Unexpectedly out of data */
 
                /* Move to the next buffer */
+               rxrpc_post_dec(call);
                spent = cursor;
                cursor = cursor->next;
                call->buffer_head = cursor;
 
                call->data_cursor = call->data_start = cursor->buf;
-               count = cursor->io_cursor & ~3;
-               if (count == 0)
+               segment = cursor->io_cursor;
+               if (segment == 0)
                        abort();
-               new_stop = cursor->buf + count;
+               new_stop = cursor->buf + segment;
 
                spent->magic = RXGEN_BUF_DEAD;
                free(spent->buf);
        call->data_stop = new_stop;
        rxrpc_check_call(call);
 }
+
+/*
+ * Slow path for decoding a 4-byte integer read from the buffer list.
+ */
+uint32_t rxrpc_dec_slow(struct rx_call *call)
+{
+       net_xdr_t x;
+       uint32_t ret;
+
+       debug("DEC SLOW %lu %ld\n",
+             (unsigned long)call->data_cursor & 3,
+             call->data_cursor - call->data_stop);
+
+       call->blob_size = 4;
+       call->blob_offset = 0;
+       call->blob = &x;
+
+       /* We should only come here if there is sufficient data in the
+        * buffers for us to complete all the copy operation(s).
+        */
+       if (call->data_count < call->blob_size)
+               abort();
+
+       while (call->blob_offset < call->blob_size)
+               rxrpc_dec_blob(call);
+
+       ret = ntohl(x);
+       debug("DEC SLOW = %x\n", ret);
+       return ret;
+}
 
 #include "afs_py.h"
 #include "rxgen.h"
 
+#define debug(fmt, ...) do { if (0) printf(fmt, ## __VA_ARGS__); } while (0)
+
+struct py_dec_index {
+       unsigned long val;
+       void *ptr;
+};
+
+struct py_dec_manager {
+       Py_buffer view;
+       struct py_dec_index top;
+       struct py_dec_index indices[];
+};
+
 PyObject *py_rxgen_get_struct(const void *p, PyObject **cache,
                              PyObject *(*data_to_type)(const void *elem))
 {
        return 0;
 }
 
-/*
- * Decode a string of bytes into a preallocated unicode string python object.
- */
-void py_dec_string(struct rx_call *call)
-{
-       PyObject *str = call->bulk_item;
-       struct rx_buf *cursor, *spent;
-       unsigned needed, count;
-       uint8_t *new_stop;
-
-       if (PyUnicode_KIND(str) != PyUnicode_1BYTE_KIND)
-               abort();
-
-       rxrpc_post_dec(call);
-
-       needed = call->bulk_count - call->bulk_index;
-       count = call->data_stop - call->data_cursor;
-       if (count > 0) {
-               if (count > needed)
-                       count = needed;
-               memcpy(PyUnicode_DATA(str) + call->bulk_index, call->data_cursor, count);
-               call->bulk_index += count;
-               call->data_cursor += count;
-               rxrpc_dec_align(call);
-               if (call->bulk_count <= call->bulk_index)
-                       return;
-               
-       }
-
-       /* More data may have come into a partially filled buffer from which we
-        * previously read.
-        */
-       cursor = call->buffer_head;
-       count = cursor->io_cursor & ~3;
-       new_stop = cursor->buf + count;
-       if (call->data_stop >= new_stop) {
-               /* This buffer must then be completely used as we're required to check
-                * amount received before reading it.
-                */
-               if (new_stop != cursor->buf + RXGEN_BUFFER_SIZE)
-                       abort(); /* Didn't completely consume a buffer */
-               if (call->buffer_tail == cursor)
-                       abort(); /* Unexpectedly out of data */
-
-               /* Move to the next buffer */
-               spent = cursor;
-               cursor = cursor->next;
-               call->buffer_head = cursor;
-
-               call->data_cursor = call->data_start = cursor->buf;
-               count = cursor->io_cursor & ~3;
-               if (count == 0)
-                       abort();
-               new_stop = cursor->buf + count;
-
-               free(spent->buf);
-               free(spent);
-       }
-
-       call->data_stop = new_stop;
-}
-
 /*
  * Recursively encode a standard C array.
  */
 static int py_enc_c_array(struct rx_call *call,
                          const void *data, int dim,
                          const Py_ssize_t *dim_size,
-                         Py_ssize_t itemsize,
-                         int align)
+                         Py_ssize_t itemsize)
 {
        if (dim == 1) {
                /* Data subarray */
-               rxrpc_enc_bytes(call, data, *dim_size * itemsize);
-               if (align)
-                       rxrpc_enc_align(call);
+               rxrpc_enc_blob(call, data, *dim_size * itemsize);
                return rxrpc_post_enc(call);
        } else {
                /* Pointer subarray */
                const void *const *ptrs = data;
                Py_ssize_t i;
                for (i = 0; i < *dim_size; i++)
-                       if (py_enc_c_array(call, ptrs[i], dim - 1, dim_size + 1, itemsize,
-                                          align) < 0)
+                       if (py_enc_c_array(call, ptrs[i], dim - 1, dim_size + 1, itemsize) < 0)
                                return -1;
                return 0;
        }
 
        if (dim == 0)
                /* Single data item */
-               return rxrpc_enc_bytes(call, data, itemsize);
+               return rxrpc_enc_blob(call, data, itemsize);
 
        for (i = 0; i < *dim_size; i++) {
                if (py_enc_numpy_array(call, data + i * *dim_stride,
 
        if (dim == 0)
                /* Single data item */
-               return rxrpc_enc_bytes(call, data, itemsize);
+               return rxrpc_enc_blob(call, data, itemsize);
 
        for (i = 0; i < *dim_size; i++) {
                const void *ptr = data + i * *dim_stride;
  * Encode the just the contents of a python buffer view, without length or
  * realignment.
  */
-int py_enc_buffer_raw(struct rx_call *call, Py_buffer *view, size_t dim, int align)
+int py_enc_buffer_raw(struct rx_call *call, Py_buffer *view, size_t dim)
 {
        if (call->error_code)
                return -1;
 
        if (view->ndim == 0 || (view->ndim == 1 && !view->shape)) {
                /* Simple scalar array */
-               if (rxrpc_enc_bytes(call, view->buf, view->len) < 0)
+               if (rxrpc_enc_blob(call, view->buf, view->len) < 0)
                        return -1;
                return rxrpc_post_enc(call);
        }
 
        if (!view->strides)
                /* Standard C array */
-               return py_enc_c_array(call, view->buf, view->ndim, view->shape, view->itemsize,
-                                     align);
+               return py_enc_c_array(call, view->buf, view->ndim, view->shape, view->itemsize);
 
        if (!view->suboffsets)
                return py_enc_numpy_array(call, view->buf, view->ndim, view->shape, view->strides,
  */
 int py_enc_buffer(struct rx_call *call, Py_buffer *view, size_t dim)
 {
+       static unsigned zero;
        int i;
 
        if (call->error_code)
                return -1;
 
        if (0) {
-               printf("PEBUF: l=%zu isz=%zd {", view->len, view->itemsize);
+               debug("PEBUF: l=%zu isz=%zd {", view->len, view->itemsize);
                if (view->shape)
                        for (i = 0; i < view->ndim; i++)
-                               printf(" %zd", view->shape[i]);
-               printf(" }\n");
+                               debug(" %zd", view->shape[i]);
+               debug(" }\n");
        }
 
        if (view->len > dim) {
        }
 
        rxrpc_enc(call, view->len);
-       if (py_enc_buffer_raw(call, view, dim, true) == -1)
+       if (py_enc_buffer_raw(call, view, dim) == -1)
                return -1;
 
-       rxrpc_enc_align(call);
+       if (view->len & 3)
+               rxrpc_enc_blob(call, &zero, 4 - (view->len & 3));
        return rxrpc_post_enc(call);
 }
 
 /*
- * Recursively decode a standard C array.
+ * Recursively decode into a standard C array.
  */
-static int py_dec_c_array(struct rx_call *call,
-                         void *data, int dim,
-                         const Py_ssize_t *dim_size,
-                         Py_ssize_t itemsize)
+static int py_dec_c_array(struct rx_call *call, struct py_dec_manager *manager)
 {
-       if (dim == 1) {
-               /* Data subarray */
-               call->bulk_item = data;
-               call->bulk_count = *dim_size * itemsize;
-               call->bulk_index = 0;
-               rxrpc_dec_bytes(call);
-               return rxrpc_post_dec(call);
-       } else {
-               /* Pointer subarray */
-               void **ptrs = data;
-               Py_ssize_t i;
-               for (i = 0; i < *dim_size; i++)
-                       if (py_dec_c_array(call, ptrs[i], dim - 1, dim_size + 1, itemsize) < 0)
-                               return -1;
-               return 0;
+       unsigned long val;
+       int i;
+
+       /* First of all, we finish off the blob we're currently decoding
+        * into.
+        */
+       if (call->blob_size < manager->view.itemsize) {
+               rxrpc_dec_blob(call);
+               if (rxrpc_post_dec(call) < 0)
+                       return -1;
+               if (call->blob_size < manager->view.itemsize)
+                       goto need_more;
+       }
+
+       /* Now we increment the per-dimension counters, overflowing and
+        * wrapping when a counter reaches the limit.  When the outermost
+        * wraps, we are done.
+        */
+       for (i = manager->view.ndim - 1; i >= 0; i--) {
+               val = manager->indices[i].val;
+               if (val + 1 < manager->view.shape[i]) {
+                       manager->indices[i].val = val + 1;
+                       goto incremented;
+               }
+               manager->indices[i].val = 0;
        }
+
+       return 0; /* Complete */
+
+incremented:
+       /* Recalculate the cached pointers */
+       for (; i <= manager->view.ndim - 1; i++) {
+               void **ptrs = manager->indices[i - 1].ptr;
+               void *ptr = ptrs[manager->indices[i].val];
+               manager->indices[i].ptr = ptr;
+       }
+
+       call->blob_offset = 0;
+       call->blob = manager->indices[manager->view.ndim - 1].ptr;
+need_more:
+       return 1;
 }
 
 /*
- * Recursively decode a NumPy-style array.
+ * Recursively decode into a NumPy-style array.
  */
-static int py_dec_numpy_array(struct rx_call *call,
-                             void *data, int dim,
-                             const Py_ssize_t *dim_size,
-                             const Py_ssize_t *dim_stride,
-                             Py_ssize_t itemsize)
+static int py_dec_numpy_array(struct rx_call *call, struct py_dec_manager *manager)
 {
+       unsigned long val;
        int i;
 
-       if (dim == 0) {
-               /* Single data item */
-               call->bulk_item = data;
-               call->bulk_count = itemsize;
-               call->bulk_index = 0;
-               rxrpc_dec_bytes(call);
-               return 0;
+       /* First of all, we finish off the blob we're currently decoding
+        * into.
+        */
+       if (call->blob_size < manager->view.itemsize) {
+               rxrpc_dec_blob(call);
+               if (rxrpc_post_dec(call) < 0)
+                       return -1;
+               if (call->blob_size < manager->view.itemsize)
+                       goto need_more;
        }
 
-       for (i = 0; i < *dim_size; i++) {
-               if (py_dec_numpy_array(call, data + i * *dim_stride,
-                                      dim - 1, dim_size + 1, dim_stride + 1, itemsize) < 0)
-                       return -1;
-               if (dim == 1 && rxrpc_post_dec(call) < 0)
-                       return -1;
+       /* Now we increment the per-dimension counters, overflowing and
+        * wrapping when a counter reaches the limit.  When the outermost
+        * wraps, we are done.
+        */
+       for (i = manager->view.ndim - 1; i >= 0; i--) {
+               val = manager->indices[i].val;
+               if (val + 1 < manager->view.shape[i]) {
+                       manager->indices[i].val = val + 1;
+                       goto incremented;
+               }
+               manager->indices[i].val = 0;
        }
-       return 0;
+
+       return 0; /* Complete */
+
+incremented:
+       /* Recalculate the cached pointers */
+       for (; i <= manager->view.ndim - 1; i++) {
+               void *ptr = manager->indices[i - 1].ptr;
+               ptr += manager->indices[i].val * manager->view.strides[i];
+               manager->indices[i].ptr = ptr;
+       }
+
+       call->blob_offset = 0;
+       call->blob = manager->indices[manager->view.ndim - 1].ptr;
+need_more:
+       return 1;
 }
 
 /*
- * Recursively decode a Python Imaging Library (PIL)-style array.
+ * Recursively decode into a Python Imaging Library (PIL)-style array.
  */
-static int py_dec_pil_array(struct rx_call *call,
-                           void *data, int dim,
-                           const Py_ssize_t *dim_size,
-                           const Py_ssize_t *dim_stride,
-                           const Py_ssize_t *dim_suboffset,
-                           Py_ssize_t itemsize)
+static int py_dec_pil_array(struct rx_call *call, struct py_dec_manager *manager)
 {
+       unsigned long val;
        int i;
 
-       if (dim == 0) {
-               /* Single data item */
-               call->bulk_item = data;
-               call->bulk_count = itemsize;
-               call->bulk_index = 0;
-               rxrpc_dec_bytes(call);
-               return 0;
+       /* First of all, we finish off the blob we're currently decoding
+        * into.
+        */
+       if (call->blob_size < manager->view.itemsize) {
+               rxrpc_dec_blob(call);
+               if (rxrpc_post_dec(call) < 0)
+                       return -1;
+               if (call->blob_size < manager->view.itemsize)
+                       goto need_more;
        }
 
-       for (i = 0; i < *dim_size; i++) {
-               void *ptr = data + i * *dim_stride;
-               if (*dim_suboffset >= 0) {
+       /* Now we increment the per-dimension counters, overflowing and
+        * wrapping when a counter reaches the limit.  When the outermost
+        * wraps, we are done.
+        */
+       for (i = manager->view.ndim - 1; i >= 0; i--) {
+               val = manager->indices[i].val;
+               if (val + 1 < manager->view.shape[i]) {
+                       manager->indices[i].val = val + 1;
+                       goto incremented;
+               }
+               manager->indices[i].val = 0;
+       }
+
+       return 0; /* Complete */
+
+incremented:
+       /* Recalculate the cached pointers */
+       for (; i <= manager->view.ndim - 1; i++) {
+               void *ptr = manager->indices[i - 1].ptr;
+               ptr += manager->indices[i].val * manager->view.strides[i];
+               if (manager->view.suboffsets[i] >= 0) {
                        ptr = *((void *const *)ptr);
-                       ptr += *dim_suboffset;
+                       ptr += manager->view.suboffsets[i];
                }
-               if (py_dec_pil_array(call, ptr,
-                                    dim - 1, dim_size + 1, dim_stride + 1, dim_suboffset + 1,
-                                    itemsize) < 0)
-                       return -1;
-               if (dim == 1 && rxrpc_post_dec(call) < 0)
-                       return -1;
+               manager->indices[i].ptr = ptr;
        }
-       return 0;
+
+       call->blob_offset = 0;
+       call->blob = manager->indices[manager->view.ndim - 1].ptr;
+need_more:
+       return 1;
 }
 
 /*
  * Decode the contents of an opaque type
  */
-int py_dec_opaque(struct rx_call *call, PyObject *obj)
+int py_dec_into_buffer(struct rx_call *call)
 {
-       Py_buffer view;
-       int i;
+       struct py_dec_manager *manager = call->decoder_manager;
+       int ret;
 
        if (call->error_code)
                return -1;
 
-       if (PyObject_GetBuffer(obj, &view, PyBUF_FULL) < 0)
-               return -1;
+       if (call->blob == &rxgen_dec_padding_sink) {
+               rxrpc_dec_blob(call);
+               if (rxrpc_post_dec(call) == -1)
+                       return -1;
+               return (call->blob_offset == call->blob_size) ? 0 : 1;
+       }
 
-       if (0) {
-               printf("PEBUF: l=%zu isz=%zd {", view.len, view.itemsize);
-               if (view.shape)
-                       for (i = 0; i < view.ndim; i++)
-                               printf(" %zd", view.shape[i]);
-               printf(" }\n");
+       if (manager->view.ndim == 0 || (manager->view.ndim == 1 && !manager->view.shape)) {
+               /* Single scalar */
+               rxrpc_dec_blob(call);
+               if (rxrpc_post_dec(call) == -1)
+                       return -1;
+               ret = (call->blob_offset == call->blob_size) ? 0 : 1;
+       } else if (!manager->view.strides)
+               ret = py_dec_c_array(call, manager);
+       else if (!manager->view.suboffsets)
+               ret = py_dec_numpy_array(call, manager);
+       else
+               ret = py_dec_pil_array(call, manager);
+
+       if (ret <= 0) {
+               PyBuffer_Release(&manager->view);
+               free(call->decoder_manager);
+               call->decoder_manager = NULL;
        }
 
-       if (view.ndim == 0 || (view.ndim == 1 && !view.shape)) {
-               /* Simple scalar array */
-               call->bulk_item = view.buf;
-               call->bulk_count = view.len;
-               call->bulk_index = 0;
-               rxrpc_dec_bytes(call);
-               if (rxrpc_post_dec(call) < 0)
-                       goto error;
-               goto done;
+       if (ret == 0 && call->padding_size > 0) {
+               /* Soak up the padding to a 32-bit boundary */
+               call->blob = &rxgen_dec_padding_sink;
+               call->blob_size = 4 - (call->blob_size & 3);
+               call->blob_offset = 0;
+               return 1;
+       }
+
+       return ret;
+}
+
+/*
+ * Initialise buffer decode.
+ *
+ * Returns 0 if there's nothing to do, 1 if the decode is set up and -1 on error,
+ */
+int py_dec_init_buffer(struct rx_call *call, Py_buffer *view, bool padded)
+{
+       struct py_dec_manager *manager = NULL;
+       size_t size;
+       int i;
+
+       if (call->decoder_manager) {
+               manager = call->decoder_manager;
+               PyBuffer_Release(&manager->view);
+               free(manager);
+               manager = NULL;
+               call->decoder_manager = NULL;
+       }
+
+       debug("INIT_BUFFER: l=%zu isz=%zd nd=%d\n", view->len, view->itemsize, view->ndim);
+       call->need_size = view->len;
+       if (view->len == 0) {
+               PyBuffer_Release(view);
+               return 0;
+       }
+
+       call->padding_size = padded ? 4 - (view->len & 3) : 0;
+
+       size = sizeof(struct py_dec_manager);
+       size += view->ndim * sizeof(struct py_dec_index);
+       manager = malloc(size);
+       if (!manager) {
+               PyBuffer_Release(view);
+               PyErr_NoMemory();
+               return -1;
        }
 
-       if (!view.strides) {
+       memset(manager, 0, size);
+       memcpy(&manager->view, view, sizeof(*view));
+       manager->indices[-1].ptr = manager->view.buf;
+
+       call->blob_size = manager->view.itemsize;
+
+       if (manager->view.ndim == 0) {
+               call->blob_size = manager->view.len;
+       } else if (manager->view.ndim == 1 && !manager->view.shape) {
+               manager->indices[0].ptr = manager->indices[-1].ptr;
+               call->blob_size = manager->view.len;
+       } else if (!manager->view.strides) {
                /* Standard C array */
-               if (py_dec_c_array(call, view.buf, view.ndim, view.shape, view.itemsize) < 0)
-                       goto error;
-               goto done;
+               for (i = 0; i <= manager->view.ndim - 1; i++) {
+                       void **ptrs = manager->indices[i - 1].ptr;
+                       void *ptr = ptrs[manager->indices[i].val];
+                       manager->indices[i].ptr = ptr;
+               }
+       } else if (!manager->view.suboffsets) {
+               /* NumPy-style Python array */
+               for (i = 0; i <= manager->view.ndim - 1; i++) {
+                       void *ptr = manager->indices[i - 1].ptr;
+                       ptr += manager->indices[i].val * manager->view.strides[i];
+                       manager->indices[i].ptr = ptr;
+               }
+       } else {
+               /* PIL-style Python array */
+               for (i = 0; i <= manager->view.ndim - 1; i++) {
+                       void *ptr = manager->indices[i - 1].ptr;
+                       ptr += manager->indices[i].val * manager->view.strides[i];
+                       if (manager->view.suboffsets[i] >= 0) {
+                               ptr = *((void *const *)ptr);
+                               ptr += manager->view.suboffsets[i];
+                       }
+                       manager->indices[i].ptr = ptr;
+               }
        }
 
-       if (!view.suboffsets) {
-               if (py_dec_numpy_array(call, view.buf, view.ndim, view.shape, view.strides,
-                                      view.itemsize) < 0)
-                       goto error;
-               goto done;
+       /* Set the first blob to be decoded */
+       call->blob = manager->indices[manager->view.ndim - 1].ptr;
+       call->blob_offset = 0;
+       call->decoder_manager = manager;
+       return 1;
+}
+
+/*
+ * Initialise the decoding of the contents of an opaque type
+ */
+int py_dec_init_opaque(struct rx_call *call, PyObject *obj)
+{
+       Py_buffer view;
+       int ret;
+
+       if (call->error_code)
+               return -1;
+
+       if (PyObject_GetBuffer(obj, &view, PyBUF_FULL) < 0) {
+               debug("*** GET BUFFER FAILED\n");
+               return -1;
        }
 
-       /* PIL-style Python array */
-       if (py_dec_pil_array(call, view.buf, view.ndim, view.shape, view.strides,
-                            view.suboffsets, view.itemsize) < 0)
-               goto error;
+       ret = py_dec_init_buffer(call, &view, true);
+       if (ret == -1)
+               return -1;
 
-done:
-       PyBuffer_Release(&view);
-       return call->error_code ? -1 : 0;
+       return (ret < 0 || call->error_code) ? -1 : 0;
+}
 
-error:
-       PyBuffer_Release(&view);
-       return -1;
+/*
+ * Decode a string of bytes into a preallocated unicode string python object.
+ */
+int py_dec_into_string(struct rx_call *call)
+{
+       PyObject *str = call->blob;
+       unsigned needed, segment, i;
+
+       rxrpc_post_dec(call);
+
+       needed = call->blob_size - call->blob_offset;
+       segment = call->data_stop - call->data_cursor;
+       debug("DEC STR dc=%u bsize=%u seg=%u\n", call->data_count, needed, segment);
+
+       if (segment > 0) {
+               if (segment > needed)
+                       segment = needed;
+               if (call->blob != &rxgen_dec_padding_sink) {
+                       for (i = 0; i < segment; i++)
+                               PyUnicode_WRITE(PyUnicode_KIND(str), PyUnicode_DATA(str),
+                                               call->blob_offset + i, call->data_cursor[i]);
+               }
+               call->blob_decoded += segment;
+               call->blob_offset += segment;
+               call->data_cursor += segment;
+               if (call->blob_size <= call->blob_offset) {
+                       if (call->blob != &rxgen_dec_padding_sink && call->padding_size > 0) {
+                               /* Soak up the padding to a 32-bit boundary */
+                               call->blob = &rxgen_dec_padding_sink;
+                               call->blob_size = call->padding_size;
+                               call->blob_offset = 0;
+                               return 1;
+                       }
+                       return 0;
+               }
+       }
+
+       rxrpc_dec_advance_buffer(call);
+       return 1;
+}
+
+/*
+ * Decode a string of bytes into a preallocated unicode string python object.
+ */
+int py_dec_init_string(struct rx_call *call, PyObject **_str)
+{
+       PyObject *str;
+
+       debug("INIT STRING %u\n", call->blob_size);
+
+       str = PyUnicode_New(call->blob_size, 255);
+       if (!str)
+               return -1;
+       *_str = str;
+       if (call->blob_size == 0)
+               return 0;
+
+       if (PyUnicode_READY(str) < 0) {
+               debug("*** STRING NON-CANONICAL\n");
+               abort();
+       }
+
+       call->blob = str;
+       call->blob_offset = 0;
+       call->padding_size = 4 - (call->blob_size & 3);
+       return 1;
 }
 
+/*
+ * Comparator for binary searching the abort code table
+ */
 static int py_rxgen_received_abort_cmp(const void *key, const void *_entry)
 {
        const struct kafs_abort_list *entry = _entry;
        else
                return PyErr_Format(ex, "Aborted %u", call->abort_code);
 }
+
+/*
+ * Clean up a call after decoding
+ */
+void py_rxgen_decoder_cleanup(struct rx_call *call)
+{
+       if (call->decoder_manager) {
+               struct py_dec_manager *manager = call->decoder_manager;
+               PyBuffer_Release(&manager->view);
+               free(manager);
+       }
+       Py_XDECREF(call->decoder_split_callback);
+       Py_XDECREF(call->decoder_split_info);
+}
 
 #ifndef _PY_RXGEN_H
 #define _PY_RXGEN_H
 
+#include "rxgen.h"
+
 struct py_rx_connection {
        PyObject_HEAD
        struct rx_connection *x;
        split_dead
 };
 
-struct py_rx_split_call {
+struct py_rx_split_info {
        PyObject_HEAD
        struct rx_call *call;
-       enum py_rx_split_state state;
+       PyObject *target;
+       enum py_rx_split_state split_state;
+       unsigned state;
+       bool receiving_data;
 };
 
 extern PyTypeObject py_rx_connectionType;
 extern PyObject *kafs_py_string_to_key(PyObject *, PyObject *);
 
 extern int py_rxgen_initialise_members(PyObject *obj, PyObject *kw);
+extern void py_rxgen_decoder_cleanup(struct rx_call *call);
 
 /*
  * Single embedded struct handling
  */
 extern PyObject *py_rxgen_get_string(const void *_p, size_t n);
 extern int py_rxgen_set_string(void *_p, size_t n, PyObject *val);
-extern int py_enc_buffer_raw(struct rx_call *call, Py_buffer *view, size_t dim, int align);
+extern int py_enc_buffer_raw(struct rx_call *call, Py_buffer *view, size_t dim);
 extern int py_enc_buffer(struct rx_call *call, Py_buffer *view, size_t dim);
-extern void py_dec_string(struct rx_call *call);
-extern int py_dec_opaque(struct rx_call *call, PyObject *obj);
+extern int py_dec_into_buffer(struct rx_call *call);
+extern int py_dec_init_buffer(struct rx_call *call, Py_buffer *view, bool padded);
+extern int py_dec_init_opaque(struct rx_call *call, PyObject *obj);
+extern int py_dec_into_string(struct rx_call *call);
+extern int py_dec_init_string(struct rx_call *call, PyObject **_str);
 
 /*
  * Embedded general array handling
 /*
  * Split-mode RPC call handling
  */
-extern PyTypeObject py_rx_split_callType;
+extern PyTypeObject py_rx_split_infoType;
 extern PyObject *py_rxgen_split_client_prepare(void);
-extern int py_rxgen_split_client_transmit(PyObject *split, PyObject *split_call);
-extern int py_rxgen_split_client_receive(PyObject *split, PyObject *split_call);
+extern int py_rxgen_split_transmit(struct rx_call *call);
+extern int py_rxgen_split_receive(struct rx_call *call, bool init);
 
-static inline void py_rxgen_split_client_set(PyObject *_split_call, struct rx_call *call)
+static inline void py_rxgen_split_client_set(struct rx_call *call,
+                                            PyObject *split_callback,
+                                            PyObject *_split_info)
 {
-       struct py_rx_split_call *split_call = (struct py_rx_split_call *)_split_call;
-       split_call->call = call;
+       struct py_rx_split_info *split_info = (struct py_rx_split_info *)_split_info;
+
+       split_info->call = call;
+       Py_INCREF(split_callback);
+       call->decoder_split_callback = split_callback;
+       call->decoder_split_info = _split_info;
 }
 
 #endif /* _PY_RXGEN_H */
 
 #include "afs_py.h"
 #include "rxgen.h"
 
+#define debug(fmt, ...) do { if (0) printf(fmt, ## __VA_ARGS__); } while (0)
+
+static int py_rx_split_do_recv(struct rx_call *call,
+                              struct py_rx_split_info *split_info)
+{
+       struct pollfd fds[1];
+
+       fds[0].fd = call->conn->fd;
+       fds[0].events = POLLIN;
+       fds[0].revents = 0;
+       poll(fds, 1, 0);
+       if (fds[0].revents & POLLIN)
+               rxrpc_recv_data(call->conn, true);
+       return 0;
+}
+
+static int py_rx_split_do_send_recv(struct rx_call *call,
+                                   struct py_rx_split_info *split_info,
+                                   bool more)
+{
+       if (!more) {
+               call->more_send = 0;
+               split_info->split_state = split_idle;
+       }
+       if (rxrpc_send_data(call) == -1)
+               return -1;
+
+       return py_rx_split_do_recv(call, split_info);
+}
+
 /*
- * Send an RPC call: split.send(data, more)
+ * Send an RPC call: split_info.send(data, more)
  */
 static PyObject *py_rx_split_send(PyObject *_self, PyObject *args)
 {
-       struct py_rx_split_call *self = (struct py_rx_split_call *)_self;
-       struct rx_call *call = self->call;
-       struct pollfd fds[1];
+       struct py_rx_split_info *split_info = (struct py_rx_split_info *)_self;
+       struct rx_call *call = split_info->call;
        Py_buffer data;
-       int more;
+       int more = 0;
 
        if (!call)
                abort();
 
-       if (self->state != split_transmitting)
+       if (split_info->split_state != split_transmitting)
                abort();
 
        if (!PyArg_ParseTuple(args, "y*p", &data, &more))
                return NULL;
 
-       if (py_enc_buffer_raw(call, &data, UINT_MAX, false) == -1)
+       if (py_enc_buffer_raw(call, &data, UINT_MAX) == -1)
                return PyErr_SetFromErrno(PyExc_OSError);
 
-       if (!more) {
-               call->more = 0;
-               self->state = split_idle;
-       }
-       if (rxrpc_send_data(call) == -1)
+       if (py_rx_split_do_send_recv(call, split_info, more) < 0)
                return PyErr_SetFromErrno(PyExc_OSError);
 
-       fds[0].fd = call->conn->fd;
-       fds[0].events = POLLIN;
-       fds[0].revents = 0;
-       poll(fds, 1, 0);
-       if (fds[0].revents & POLLIN)
-               rxrpc_recv_data(call->conn, true);
+       Py_RETURN_NONE;
+}
+
+/*
+ * Request reception of some data, where the amount of data is potentially
+ * zero.  This can only be used if there is no non-split data to be received
+ * after the split data as this commits the caller to handling all the
+ * remaining data in this phase.
+ *
+ * The caller should use split_info.data_available() to work out how much data
+ * is available, if any.
+ */
+static PyObject *py_rx_split_will_recv_all(PyObject *_self, PyObject *args)
+{
+       struct py_rx_split_info *split_info = (struct py_rx_split_info *)_self;
+       struct rx_call *call = split_info->call;
 
+       call->need_size = UINT_MAX;
        Py_RETURN_NONE;
 }
 
 /*
- * Recv an RPC call: v = split.recv(n)
+ * Receive a fixed-size object: ret = split_info.begin_recv(buffer)
+ *
+ * The object's intrinsic size is used to determine how much data we want for
+ * it.  Potentially 0-length, variable sized data cannot be handled this way.
+ *
+ * Returns NULL on error; True on success (ie. may be more to receive).
  */
-static PyObject *py_rx_split_recv(PyObject *_self, PyObject *args)
+static PyObject *py_rx_split_begin_recv(PyObject *_self, PyObject *args)
 {
-       struct py_rx_split_call *self = (struct py_rx_split_call *)_self;
-       struct rx_call *call = self->call;
+       struct py_rx_split_info *split_info = (struct py_rx_split_info *)_self;
+       struct rx_call *call = split_info->call;
+       Py_buffer buffer;
+       int ret, padded = 1;
 
-       if (self->state != split_receiving)
+       if (!call)
+               abort();
+       if (split_info->split_state != split_receiving)
                abort();
 
-       abort();
-       return NULL;
+       if (!PyArg_ParseTuple(args, "w*|p", &buffer, &padded))
+               return NULL;
+
+       ret = py_dec_init_buffer(call, &buffer, padded);
+       switch (ret) {
+       case -1: return NULL;
+       case  0: split_info->state++; Py_RETURN_TRUE;
+       case  1: split_info->receiving_data = true; Py_RETURN_TRUE;
+       default: abort();
+       }
+}
+
+/*
+ * Query how much data is in the Rx buffers of an RPC call: n = split_info.data_available()
+ */
+static PyObject *py_rx_split_data_available(PyObject *_self, PyObject *args)
+{
+       struct py_rx_split_info *split_info = (struct py_rx_split_info *)_self;
+       struct rx_call *call = split_info->call;
+
+       switch (call->state) {
+       case rx_call_cl_decoding_response:
+       case rx_call_sv_decoding_opcode:
+       case rx_call_sv_decoding_params:
+               break;
+       default:
+               /* Aborted or some other error */
+               return NULL;
+       }
+
+       debug("DAVAIL: %u %d\n", call->data_count, call->more_recv);
+
+       if (call->data_count > 0 || call->more_recv)
+               return PyLong_FromUnsignedLong(call->data_count);
+       else
+               Py_RETURN_NONE;
 }
 
 /*
  * Methods applicable to split-call objects
  */
-static PyMethodDef py_rx_split_methods[] = {
+static PyMethodDef py_rx_split_info_methods[] = {
        {"send", (PyCFunction)py_rx_split_send, METH_VARARGS,
         "" },
-       {"recv", (PyCFunction)py_rx_split_recv, METH_VARARGS,
+       {"will_recv_all", (PyCFunction)py_rx_split_will_recv_all, METH_NOARGS,
         "" },
+       {"begin_recv", (PyCFunction)py_rx_split_begin_recv, METH_VARARGS,
+        "" },
+       {"data_available", (PyCFunction)py_rx_split_data_available, METH_NOARGS,
+        "" },
+       {}
+};
+
+static PyMemberDef py_rx_split_info_members[] = {
+       { "state", T_UINT, offsetof(struct py_rx_split_info, state), 0, ""},
+       { "target",  T_OBJECT, offsetof(struct py_rx_split_info, target), 0, ""},
        {}
 };
 
 static int
 py_rx_split_init(PyObject *_self, PyObject *args, PyObject *kwds)
 {
-       struct py_rx_split_call *self = (struct py_rx_split_call *)_self;
+       struct py_rx_split_info *self = (struct py_rx_split_info *)_self;
        self->call = NULL;
+       self->target = NULL;
+       self->split_state = split_idle;
+       self->state = 0;
+       self->receiving_data = false;
        return 0;
 }
 
 static void
-py_rx_split_dealloc(struct py_rx_split_call *self)
+py_rx_split_dealloc(struct py_rx_split_info *self)
 {
-       assert(self->state != split_dead);
-       self->state = split_dead;
+       assert(self->split_state != split_dead);
+       self->split_state = split_dead;
        self->call = NULL;
+       Py_XDECREF(self->target);
        Py_TYPE(self)->tp_free((PyObject *)self);
 }
 
-PyTypeObject py_rx_split_callType = {
+PyTypeObject py_rx_split_infoType = {
        PyVarObject_HEAD_INIT(NULL, 0)
-       "kafs.rx_split_call",           /*tp_name*/
-       sizeof(struct py_rx_split_call), /*tp_basicsize*/
+       "kafs.rx_split_info",           /*tp_name*/
+       sizeof(struct py_rx_split_info), /*tp_basicsize*/
        0,                              /*tp_itemsize*/
        (destructor)py_rx_split_dealloc, /*tp_dealloc*/
        0,                              /*tp_print*/
        0,                              /* tp_weaklistoffset */
        0,                              /* tp_iter */
        0,                              /* tp_iternext */
-       py_rx_split_methods,            /* tp_methods */
-       0,                              /* tp_members */
+       py_rx_split_info_methods,       /* tp_methods */
+       py_rx_split_info_members,       /* tp_members */
        0,                              /* tp_getset */
        0,                              /* tp_base */
        0,                              /* tp_dict */
 };
 
 /*
- * Check a split-RPC object has transmit and receive functions.
+ * Allocate an information object.
  */
 PyObject *py_rxgen_split_client_prepare(void)
 {
-       struct py_rx_split_call *obj;
+       struct py_rx_split_info *obj;
 
-       obj = (struct py_rx_split_call *)_PyObject_New(&py_rx_split_callType);
+       obj = (struct py_rx_split_info *)_PyObject_New(&py_rx_split_infoType);
        if (!obj)
                return PyErr_NoMemory();
        py_rx_split_init((PyObject *)obj, NULL, NULL);
 /*
  * Perform split-RPC transmission.
  */
-int py_rxgen_split_client_transmit(PyObject *split, PyObject *_split_call)
+int py_rxgen_split_transmit(struct rx_call *call)
 {
-       struct py_rx_split_call *split_call = (struct py_rx_split_call *)_split_call;
+       struct py_rx_split_info *split_info = call->decoder_split_info;
+       PyObject *callback = call->decoder_split_callback;
        PyObject *result;
        int ret;
 
-       assert(split_call->state == split_idle);
+       assert(split_info->split_state == split_idle);
 
-       split_call->state = split_transmitting;
-       result = PyObject_CallMethod(split, "client_transmit", "O", _split_call);
-       if (result) {
+       split_info->split_state = split_transmitting;
+       result = PyObject_CallMethod(callback, "transmit",
+                                    "O", call->decoder_split_info);
+       if (result == Py_None) {
+               ret = py_rx_split_do_send_recv(call, split_info, false);
+       } else if (result) {
                ret = PyObject_IsTrue(result) ? 0 : -1;
                Py_DECREF(result);
        } else {
                ret = -1;
        }
-       if (ret == 0 && split_call->state != split_idle)
+       if (ret == 0 && split_info->split_state != split_idle)
                abort();
        return ret;
 }
 /*
  * Perform split-RPC reception.
  */
-int py_rxgen_split_client_receive(PyObject *split, PyObject *_split_call)
+int py_rxgen_split_receive(struct rx_call *call, bool init)
 {
-       struct py_rx_split_call *split_call = (struct py_rx_split_call *)_split_call;
+       struct py_rx_split_info *split_info = call->decoder_split_info;
+       PyObject *callback = call->decoder_split_callback;
        PyObject *result;
        int ret;
 
-       assert(split_call->state == split_idle);
+       assert(split_info->split_state == split_idle);
 
-       split_call->state = split_receiving;
-       result = PyObject_CallMethod(split, "client_receive", "O", _split_call);
-       split_call->state = split_idle;
-       assert(result != NULL);
+       debug("-->%s(%u,%u,%u)\n", __func__, init, split_info->state, split_info->receiving_data);
 
-       if (result) {
-               ret = PyObject_IsTrue(result) ? 0 : -1;
-               Py_DECREF(result);
+       if (init)
+               split_info->state = 0;
+
+again:
+       if (split_info->receiving_data) {
+               switch (py_dec_into_buffer(call)) {
+               case -1:
+                       result = PyObject_CallMethod(callback, "receive_failed",
+                                                    "O", call->decoder_split_info);
+                       Py_XDECREF(result);
+                       return -1;
+               case  1: /* More data needed */
+                       return 1;
+               case  0:
+                       split_info->receiving_data = false;
+                       split_info->state++;
+                       break;
+               }
+       }
+
+       split_info->split_state = split_receiving;
+       result = PyObject_CallMethod(callback, "receive",
+                                    "O", call->decoder_split_info);
+       split_info->split_state = split_idle;
+
+       if (!result)
+               return -1;
+
+       if (result == Py_None) {
+               /* Split receive phase not used */
+               call->need_size = 0;
+               ret = 0;
+       } else if (result == Py_False) {
+               /* Split receive phase complete */
+               ret = 0;
+       } else if (result == Py_True) {
+               /* Wanting to receive data or reenter in a new state */
+               if (split_info->receiving_data)
+                       goto again;
+               ret = 1;
        } else {
+               PyErr_Format(PyExc_TypeError,
+                            "Expected True, False or None return from split receive function");
                ret = -1;
        }
+       Py_DECREF(result);
        return ret;
 }
 
 #include "af_rxrpc.h"
 #include <stdbool.h>
 #include <errno.h>
+#include <stdlib.h>
 
 typedef uint32_t net_xdr_t;
 
        enum rx_call_state state;
        unsigned        known_to_kernel : 1;
        unsigned        secured : 1;
-       unsigned        more : 1;
+       unsigned        more_send : 1;
+       unsigned        more_recv : 1;
        int             error_code;
        uint32_t        abort_code;
        unsigned        need_size;
 
-       unsigned long long bytes_sent, bytes_received;
+       unsigned long long bytes_sent, bytes_received, blob_decoded;
 
        /* Service routines */
        void (*processor)(struct rx_call *call);
        struct rx_buf   *buffer_tail;
        unsigned        data_count;
        unsigned        buffer_space;
+       unsigned        padding_size;
 
        /* Decoding support */
        unsigned        phase;          /* Encode/decode phase */
+       unsigned        blob_size;      /* Size of blob being encoded/decoded */
+       unsigned        blob_offset;    /* Offset into blob */
        unsigned        bulk_count;     /* Number of items in bulk array */
        unsigned        bulk_index;     /* Index of item being processed */
+       void            *blob;          /* Blob being encoded/decoded */
        union {
                void            *bulk_item;     /* Pointer to string/bytes/struct being processed */
                uint32_t        bulk_u32;       /* 8/16/32-bit integer being processed */
        };
        int (*decoder)(struct rx_call *call);
        void            *decoder_private;
+       void            *decoder_manager;
+       void            *decoder_split_callback;
+       void            *decoder_split_info;
+       void (*decoder_cleanup)(struct rx_call *call);
 };
 
+extern uint32_t rxgen_dec_padding_sink;
 
-extern int rxrpc_enc_bytes(struct rx_call *call, const void *data, size_t size);
+extern int rxrpc_enc_blob(struct rx_call *call, const void *data, size_t size);
 extern void rxrpc_enc_slow(struct rx_call *call, net_xdr_t data);
 static inline void rxrpc_enc(struct rx_call *call, uint32_t data)
 {
        uint32_t xdr_data = htonl(data);
-       if (__builtin_expect(call->data_cursor < call->data_stop, 1)) {
+       if (__builtin_expect(((unsigned long)call->data_cursor & 3) == 0 &&
+                            call->data_cursor < call->data_stop, 1)) {
                *(net_xdr_t *)call->data_cursor = xdr_data;
                call->data_cursor += 4;
        } else {
 
 static inline void rxrpc_enc_align(struct rx_call *call)
 {
+       abort(); // Can't assume data_cursor is 4-byte aligned
        while ((unsigned long)call->data_cursor & 3)
                *(call->data_cursor++) = 0;
 }
        }
 }
 
-extern void rxrpc_dec_bytes(struct rx_call *call);
+extern void rxrpc_dec_blob(struct rx_call *call);
 extern uint32_t rxrpc_dec_slow(struct rx_call *call);
 static inline uint32_t rxrpc_dec(struct rx_call *call)
 {
-       if (__builtin_expect(call->data_cursor < call->data_stop, 1)) {
+       if (__builtin_expect(((unsigned long)call->data_cursor & 3) == 0 &&
+                            call->data_cursor < call->data_stop, 1)) {
                net_xdr_t x = *(net_xdr_t *)call->data_cursor;
                call->data_cursor += sizeof(x);
                return ntohl(x);
 static inline void rxrpc_dec_align(struct rx_call *call)
 {
        unsigned long cursor = (unsigned long)call->data_cursor;
+       abort(); // Can't assume data_cursor is 4-byte aligned
        cursor = (cursor + 3) & ~3UL;
        call->data_cursor = (uint8_t*)cursor;
 }
                size_t n = call->data_cursor - call->data_start;
                call->data_start = call->data_cursor;
                call->data_count -= n;
+               call->need_size -= n;
                return 0;
        } else {
                errno = call->error_code;
        }
 }
 
+extern void rxrpc_dec_advance_buffer(struct rx_call *call);
 extern int rxgen_dec_discard_excess(struct rx_call *call);
 
 extern struct rx_connection *rx_new_connection(const struct sockaddr *sa,
 
 
        if ($p->{class} eq "array") {
            die $p->{where}, ": Array arg not supported";
-       } elsif ($p->{class} eq "bulk" &&
-           !($p->{elem}->{class} eq "string" ||
-             $p->{elem}->{class} eq "opaque")
-           ) {
+       } elsif ($p->{class} eq "bulk") {
            # Encode
            if ($p->{elem}->{class} eq "struct") {
                $proto  = "int (*get__" . $p->{name} . ")(struct rx_call *call, void *token)";
            push @declines, "void *token__" . $p->{name};
            push @declines, "size_t nr__" . $p->{name};
            push @args, "token__" . $p->{name};
-       } elsif ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" ||
-                                          $p->{elem}->{class} eq "opaque")) {
+       } elsif ($p->{class} eq "blob") {
            $proto = $p->{type} . " " . $p->{ptr} . $p->{name};
            push @enclines, "size_t nr__" . $p->{name};
            push @enclines, "const " . $proto;
            ) {
            $phase->{size} +=  $p->{xdr_size};
            push @{$phase->{params}}, $p;
+       } elsif ($p->{class} eq "blob") {
+           $have_bulk = 2;
+
+           # Bulk objects begin with an element count
+           $phase->{elem_count} = $phase->{size};
+           $phase->{size} +=  4;
+
+           my %pseudoparam = (
+               class   => "basic",
+               type    => "blob_size",
+               name    => $p->{name},
+               elem    => $p->{elem},
+               where   => $p->{where},
+               xdr_size => $p->{xdr_size},
+           );
+           push @{$phase->{params}}, \%pseudoparam;
+
+           # Create a new phase
+           $phase = {
+               type    => "blob",
+               name    => $p->{name},
+               elem    => $p->{elem},
+               params  => [ $p ],
+               xdr_size => $p->{xdr_size},
+               size    => $p->{xdr_size},
+           };
+           push @phases, $phase;
+
+           $phase->{size} = 4;
+           $phase = 0;
        } elsif ($p->{class} eq "bulk") {
            $have_bulk = 1 if ($have_bulk == 0);
-           if ($p->{elem}->{class} eq "string" ||
-               $p->{elem}->{class} eq "opaque") {
-               $have_bulk = 2;
-           }
 
            # Bulk objects begin with an element count
            $phase->{elem_count} = $phase->{size};
            };
            push @phases, $phase;
 
-           if ($p->{elem}->{class} eq "string" ||
-               $p->{elem}->{class} eq "opaque") {
-               $phase->{size} = 4;
-           } else {
-               # We don't want to be sending one object at a time if they're
-               # really small.
-               my $n_buf = ($p->{xdr_size} < 1020) ? int(1020 / $p->{xdr_size}) : 1;
-               $n_buf *= $p->{xdr_size};
-               $phase->{size} = $p->{xdr_size};
-           }
+           # We don't want to be sending one object at a time if they're
+           # really small.
+           my $n_buf = ($p->{xdr_size} < 1020) ? int(1020 / $p->{xdr_size}) : 1;
+           $n_buf *= $p->{xdr_size};
+           $phase->{size} = $p->{xdr_size};
            $phase = 0;
        } else {
            die $p->{where}, "Encode array arg not supported";
 
     unless (@params) {
        die if ($side eq "client");
-       print RXOUT "\tcall->more = 0;\n";
+       print RXOUT "\tcall->more_send = 0;\n";
        print RXOUT "\treturn 0;\n";
        print RXOUT "}\n";
        return;
     print RXOUT "\tswitch (phase) {\n";
 
     print RXOUT "\tcase 0:\n";
-    print RXOUT "\t\tcall->more = 1;\n";
+    print RXOUT "\t\tcall->more_send = 1;\n";
 
     my $phase_goto_label = 0;
     my $phix;
        }
 
        # Determine how big bulk objects are
-       if ($phase->{type} eq "bulk") {
+       if ($phase->{type} eq "blob") {
+           my $p = $phase->{params}->[0];
+           print RXOUT "\t\tcall->blob_size = ", $ptr, "nr__", $p->{name}, ";\n";
+           print RXOUT "\t\tif (call->blob_size == 0)\n";
+           print RXOUT "\t\t\tgoto phase_", $phix + 1, ";\n";
+           $phase_goto_label = $phix + 1;
+           print RXOUT "\t\tcall->blob_offset = 0;\n";
+       } elsif ($phase->{type} eq "bulk") {
            my $p = $phase->{params}->[0];
            print RXOUT "\t\tcall->bulk_count = ", $ptr, "nr__", $p->{name}, ";\n";
            print RXOUT "\t\tif (call->bulk_count == 0)\n";
 
        # Marshal the data
        foreach my $p (@{$phase->{params}}) {
-           if ($p->{type} eq "bulk_size") {
+           if ($p->{type} eq "blob_size") {
+               print RXOUT "\t\trxrpc_enc(call, ", $ptr, "nr__", $p->{name}, ");\n";
+               $close_phase = 0;
+               next;
+           } elsif ($p->{type} eq "bulk_size") {
                print RXOUT "\t\trxrpc_enc(call, ", $ptr, "nr__", $p->{name}, ");\n";
                $close_phase = 0;
                next;
                die $p->{where}, ": Unsupported type in decode";
            }
 
-           if ($p->{class} eq "bulk") {
+           if ($p->{class} eq "blob") {
+               print RXOUT "\t\tif (call->blob_offset < call->blob_size) {\n";
+               print RXOUT "\t\t\tphase = ", $phix, ";\n";
+               print RXOUT "\t\t\tgoto select_phase;\n";
+               print RXOUT "\t\t}\n";
+           } elsif ($p->{class} eq "bulk") {
                print RXOUT "\t\tif (call->bulk_index < call->bulk_count) {\n";
                print RXOUT "\t\t\tphase = ", $phix, ";\n";
                print RXOUT "\t\t\tgoto select_phase;\n";
     print RXOUT "\tcase ", $phix, ":\n";
     print RXOUT "\t\tif (rxrpc_post_enc(call) < 0)\n";
     print RXOUT "\t\t\treturn -1;\n";
-    print RXOUT "\t\tcall->more = 0;\n";
+    print RXOUT "\t\tcall->more_send = 0;\n";
     print RXOUT "\t\tbreak;\n";
     print RXOUT "\t}\n";
 
            ) {
            $phase->{size} +=  $p->{xdr_size};
            push @{$phase->{params}}, $p;
+       } elsif ($p->{class} eq "blob") {
+           $have_bulk = 1;
+
+           # Bulk objects begin with an element count
+           $phase->{elem_count} = $phase->{size};
+           $phase->{size} +=  4;
+
+           my %pseudoparam = (
+               class   => "basic",
+               type    => "blob_size",
+               name    => $p->{name},
+               elem    => $p->{elem},
+               where   => $p->{where},
+               xdr_size => $p->{xdr_size},
+               );
+           push @{$phase->{params}}, \%pseudoparam;
+
+           # Create a new phase
+           $phase = {
+               type => "blob",
+               name => $p->{name},
+               params => [ $p ],
+               size => 4,
+               xdr_size => $p->{xdr_size},
+           };
+           push @phases, $phase;
+           $phase = 0;
        } elsif ($p->{class} eq "bulk") {
            $have_bulk = 1;
 
            };
            push @phases, $phase;
 
-           if ($p->{elem}->{class} eq "string" ||
-               $p->{elem}->{class} eq "opaque") {
-               $phase->{bytearray} = 1;
-           }
-
            # We don't want to be asking recvmsg() for one object at a time if
            # they're really small.
            my $n_buf = ($p->{xdr_size} < 1020) ? int(1020 / $p->{xdr_size}) : 1;
        }
 
        # Determine how big bulk objects are
-       if ($phase->{type} eq "bulk") {
+       if ($phase->{type} eq "blob") {
+           my $p = $phase->{params}->[0];
+           print RXOUT "\t\tcall->blob_size = ", $ptr, "nr__", $p->{name}, ";\n";
+           print RXOUT "\t\tcall->blob_offset = UINT_MAX;\n";
+           print RXOUT "\t\tif (", $ptr, "alloc__", $p->{name}, "(call, &", $ptr, "token__", $p->{name}, ") < 0)\n";
+           print RXOUT "\t\t\treturn -1;\n";
+           print RXOUT "\t\tif (call->blob_size == 0)\n";
+           print RXOUT "\t\t\tgoto phase_", $phix + 1, ";\n";
+           $phase_goto_label = $phix + 1;
+           print RXOUT "\t\tcall->blob_offset = 0;\n";
+       } elsif ($phase->{type} eq "bulk") {
            my $p = $phase->{params}->[0];
            print RXOUT "\t\tcall->bulk_count = ", $ptr, "nr__", $p->{name}, ";\n";
            print RXOUT "\t\tcall->bulk_index = UINT_MAX;\n";
 
            if ($p->{elem}->{class} eq "basic") {
                print RXOUT "\t\tif (", $ptr, "store__", $p->{name}, "(call, &", $ptr, "token__", $p->{name}, ") < 0)\n";
-           } elsif ($p->{elem}->{class} eq "string" ||
-                    $p->{elem}->{class} eq "opaque") {
-               print RXOUT "\t\tif (", $ptr, "alloc__", $p->{name}, "(call, &", $ptr, "token__", $p->{name}, ") < 0)\n";
            } else {
                print RXOUT "\t\tif (", $ptr, "alloc__", $p->{name}, "(call, &", $ptr, "token__", $p->{name}, ") < 0)\n";
            }
        print RXOUT "\tcase ", $phix, ":\n";
 
        print RXOUT "\t\tif (count < ", $phase->{size}, ")";
-       if ($phase->{type} eq "bulk" && !exists($phase->{bytearray}) &&
-           $phase->{xdr_size} <= 512) {
+       if ($phase->{type} eq "bulk" &&
+           $phase->{xdr_size} <= 512
+           ) {
            print RXOUT " {\n";
            print RXOUT "\t\t\tunsigned n = call->bulk_count - call->bulk_index;\n";
            print RXOUT "\t\t\tn = MIN(n, ", int(1024 / $phase->{xdr_size}), ");\n";
        # Unmarshal the data
        print RXOUT "\n";
        foreach my $p (@{$phase->{params}}) {
-           if ($p->{type} eq "bulk_size") {
+           if ($p->{type} eq "blob_size") {
+               print RXOUT "\t\t", $ptr, "nr__", $p->{name}, " = rxrpc_dec(call);\n";
+               next;
+           } elsif ($p->{type} eq "bulk_size") {
                print RXOUT "\t\t", $ptr, "nr__", $p->{name}, " = rxrpc_dec(call);\n";
                next;
            }
                print RXOUT "\t\t\treturn -1;\n";
                print RXOUT "\t\trxgen_decode_", $p->{type}, "(call, call->bulk_item);\n";
                print RXOUT "\t\tcall->bulk_index++;\n";
-           } elsif ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" ||
-                                              $p->{elem}->{class} eq "opaque")) {
-               print RXOUT "\t\trxrpc_dec_bytes(call);\n";
+           } elsif ($p->{class} eq "blob") {
+               print RXOUT "\t\trxrpc_dec_blob(call);\n";
                print RXOUT "\t\trxrpc_dec_align(call);\n";
            } elsif ($p->{class} eq "basic" && $p->{xdr_size} == 4) {
                print RXOUT "\t\t", $ptr, $p->{name}, " = rxrpc_dec(call);\n";
                die $p->{where}, ": Unsupported type in decode";
            }
 
-           if ($p->{class} eq "bulk") {
+           if ($p->{class} eq "blob") {
+               print RXOUT "\t\tif (rxrpc_post_dec(call) < 0)\n";
+               print RXOUT "\t\t\treturn -1;\n";
+               print RXOUT "\t\tif (call->blob_offset < call->blob_size) {\n";
+               print RXOUT "\t\t\tphase = ", $phix, ";\n";
+               print RXOUT "\t\t\tgoto select_phase;\n";
+               print RXOUT "\t\t}\n";
+           } elsif ($p->{class} eq "bulk") {
                print RXOUT "\t\tif (rxrpc_post_dec(call) < 0)\n";
                print RXOUT "\t\t\treturn -1;\n";
                print RXOUT "\t\tif (call->bulk_index < call->bulk_count) {\n";
            }
        }
 
-       if ($phase->{type} ne "bulk") {
+       if ($phase->{type} ne "blob" || $phase->{type} ne "bulk") {
            print RXOUT "\t\tif (rxrpc_post_dec(call) < 0)\n";
            print RXOUT "\t\t\treturn -1;\n";
        }
 
     print RXOUT "\tstruct rx_call *call;\n" if ($what eq "request");
 
-    my @all_bulk_params = grep { $_->{class} eq "bulk"; } @{$params};
-
-    my @bulk_params = grep { ($_->{elem}->{class} ne "string" &&
-                             $_->{elem}->{class} ne "opaque"); } @all_bulk_params;
+    my @blob_params = grep { $_->{class} eq "blob"; } @{$params};
+    my @bulk_params = grep { $_->{class} eq "bulk"; } @{$params};
 
     # Local variables
     print RXOUT "\tint ret;\n";
 
     # Check lengths
-    if (@all_bulk_params) {
+    if (@blob_params || @bulk_params) {
        print RXOUT "\n";
        print RXOUT "\tif (";
        my $first = 1;
-       foreach my $p (@all_bulk_params) {
+       foreach my $p (@blob_params) {
            if ($first) {
                $first = 0;
            } else {
                print RXOUT " ||\n\t    ";
            }
-           if ($p->{elem}->{class} eq "string" ||
-               $p->{elem}->{class} eq "opaque") {
-               print RXOUT "!", $p->{name};
+           print RXOUT "!", $p->{name};
+           if (exists($p->{dim})) {
+               print RXOUT " || nr__", $p->{name}, " > ", $p->{dim};
+           }
+       }
+       foreach my $p (@bulk_params) {
+           if ($first) {
+               $first = 0;
            } else {
-               print RXOUT "!get__", $p->{name};
+               print RXOUT " ||\n\t    ";
            }
+           print RXOUT "!get__", $p->{name};
            if (exists($p->{dim})) {
                print RXOUT " || nr__", $p->{name}, " > ", $p->{dim};
            }
            print RXOUT "\trxrpc_enc(call, (uint32_t)(", $p->{name}, " >> 32));\n";
        } elsif ($p->{class} eq "struct") {
            print RXOUT "\trxgen_encode_", $p->{type}, "(call, ", $p->{name}, ");\n";
-       } elsif ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" ||
-                                          $p->{elem}->{class} eq "opaque")) {
-           print RXOUT "\trxrpc_enc_bytes(call, ", $p->{name}, ", nr__", $p->{name}, ");\n";
+       } elsif ($p->{class} eq "blob") {
+           print RXOUT "\trxrpc_enc_blob(call, ", $p->{name}, ", nr__", $p->{name}, ");\n";
            print RXOUT "\trxrpc_enc_align(call);\n";
+       } elsif ($p->{class} eq "blob") {
+           print RXOUT "\trxrpc_enc(call, nr__", $p->{name}, ");\n";
+           print RXOUT "\tcall->blob_size = nr__", $p->{name}, ";\n";
+           print RXOUT "\tfor (call->blob_offset = 0; call->blob_offset < call->blob_size; call->blob_offset++) {\n";
+           if ($p->{elem}->{class} eq "struct") {
+               print RXOUT "\t\tstruct ", $p->{elem}->{type}, " x;\n";
+           } else {
+               print RXOUT "\t\t", $p->{elem}->{type}, " x;\n";
+           }
+           print RXOUT "\t\tcall->blob = &x;\n";
+           print RXOUT "\t\tif (get__", $p->{name}, "(call, token__", $p->{name}, ") < 0)\n";
+           print RXOUT "\t\t\tgoto error;\n";
+           die $p->{where}, "No decoding for array type '$type'";
+           print RXOUT "\t}\n";
        } elsif ($p->{class} eq "bulk") {
            print RXOUT "\trxrpc_enc(call, nr__", $p->{name}, ");\n";
            print RXOUT "\tcall->bulk_count = nr__", $p->{name}, ";\n";
 
     print RXOUT "\tif (rxrpc_post_enc(call) < 0)\n";
     print RXOUT "\t\tgoto error;\n";
-    print RXOUT "\tcall->more = 0;\n";
+    print RXOUT "\tcall->more_send = 0;\n";
 
     # Send the message
     print RXOUT "\n";
 
     if (@py_type_defs) {
        print PYOUT "\tif (";
        print PYOUT "PyType_Ready(&py_rx_connectionType) < 0 ||\n\t    ";
-       print PYOUT "PyType_Ready(&py_rx_split_callType) < 0";
+       print PYOUT "PyType_Ready(&py_rx_split_infoType) < 0";
        my $first = 0;
        foreach my $def (@py_type_defs) {
            print PYOUT " ||\n\t    " unless ($first);
 
                push @complex, $p;
                print PYHDR "\t\tPyObject\t*", $p->{name}, ";\n";
            }
-           $have_opaque = 1 if ($p->{class} eq "bulk" && $p->{elem}->{class} eq "opaque");
+           $have_opaque = 1 if ($p->{class} eq "blob" && $p->{elem}->{class} eq "opaque");
        }
        print PYHDR "\t} x;\n";
        print PYHDR "\tPy_buffer dec_buf;\n" if ($have_opaque);
        print PYOUT "static PyMemberDef ", $struct_req, "_members[] = {\n";
        foreach my $p (@{$params}) {
            print PYOUT "\t{ \"", $p->{name}, "\", ";
-           if ($p->{class} eq "bulk") {                print PYOUT "T_OBJECT_EX";
+           if ($p->{class} eq "blob") {                print PYOUT "T_OBJECT_EX";
+           } elsif ($p->{class} eq "bulk") {           print PYOUT "T_OBJECT_EX";
            } elsif ($p->{type} eq "char"    ) {        print PYOUT "T_CHAR";
            } elsif ($p->{type} eq "int8_t"  ) {        print PYOUT "T_BYTE";
            } elsif ($p->{type} eq "int16_t" ) {        print PYOUT "T_SHORT";
 
     foreach my $p (@{$func->{params}}) {
        next if ($p->{class} ne "bulk");
-       next if ($p->{elem}->{class} eq "string" || $p->{elem}->{class} eq "opaque");
 
        # Data encoding
        if (!exists $bulk_get_helpers{$p->{type}}) {
     print PYOUT "\tstruct py_rx_connection *z_conn;\n";
     print PYOUT "\tstruct py_", $func->{name}, "_response *response;\n";
     foreach my $p (@{$func->{request}}) {
-       if ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" ||
-                                     $p->{elem}->{class} eq "opaque")) {
+       if ($p->{class} eq "blob") {
            print PYOUT "\tPy_buffer param_", $p->{name}, ";\n";
        } elsif ($p->{class} eq "basic") {
            print PYOUT "\t", $p->{type}, " param_", $p->{name}, ";\n";
        }
     }
 
-    print PYOUT "\tPyObject *split, *split_call;\n" if ($func->{split});
+    print PYOUT "\tPyObject *split_callback, *split_info;\n" if ($func->{split});
     print PYOUT "\tPyObject *res = NULL;\n";
     print PYOUT "\tint ret;\n";
 
        } elsif ($p->{type} eq "uint32_t") {    print PYOUT "I";
        } elsif ($p->{type} eq "uint64_t") {    print PYOUT "K";
        } elsif ($p->{class} eq "struct") {     print PYOUT "O!";
-       } elsif ($p->{class} eq "bulk" && $p->{elem}->{class} eq "string") {
+       } elsif ($p->{class} eq "blob" && $p->{elem}->{class} eq "string") {
            print PYOUT "s*";
-       } elsif ($p->{class} eq "bulk" && $p->{elem}->{class} eq "opaque") {
+       } elsif ($p->{class} eq "blob" && $p->{elem}->{class} eq "opaque") {
            print PYOUT "z*";
-       } elsif ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "basic" ||
-                                          $p->{elem}->{class} eq "struct")) {
+       } elsif ($p->{class} eq "bulk") {
            print PYOUT "O!";
        } else {
            die $p->{where}, ": No py parse for param";
     foreach my $p (@{$func->{request}}) {
        print PYOUT ",\n";
        print PYOUT "\t\t\t      ";
-       if ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" ||
-                                     $p->{elem}->{class} eq "opaque")) {
-           print PYOUT "¶m_", $p->{name};
-       } elsif ($p->{class} eq "basic") {
+       if ($p->{class} eq "basic") {
            print PYOUT "¶m_", $p->{name};
        } elsif ($p->{class} eq "struct") {
            print PYOUT "&py_", $p->{type}, "Type, ¶m_", $p->{name};
+       } elsif ($p->{class} eq "blob") {
+           print PYOUT "¶m_", $p->{name};
        } elsif ($p->{class} eq "bulk") {
            print PYOUT "&PyList_Type, ¶m_", $p->{name};
        } else {
            die $p->{where}, ": Unsupported type \"", $p->{type}, "\"";
        }
     }
-    print PYOUT ",\n\t\t\t      &split" if ($func->{split});
+    print PYOUT ",\n\t\t\t      &split_callback" if ($func->{split});
     print PYOUT "))\n";
     print PYOUT "\t\treturn NULL;\n";
 
     if ($func->{split}) {
        print PYOUT "\n";
-       print PYOUT "\tsplit_call = py_rxgen_split_client_prepare();\n";
-       print PYOUT "\tif (!split_call)\n";
+       print PYOUT "\tsplit_info = py_rxgen_split_client_prepare();\n";
+       print PYOUT "\tif (!split_info)\n";
        print PYOUT "\t\treturn NULL;\n";
     }
 
     print PYOUT "\n";
     print PYOUT "\tcall = rxrpc_alloc_call(z_conn->x, 0);\n";
     print PYOUT "\tif (!call) {\n";
-    print PYOUT "\t\tPy_XDECREF(split_call);\n" if ($func->{split});
+    print PYOUT "\t\tPy_XDECREF(split_info);\n" if ($func->{split});
     print PYOUT "\t\treturn PyErr_NoMemory();\n";
     print PYOUT "\t}\n";
-    print PYOUT "\tpy_rxgen_split_client_set(split_call, call);\n" if ($func->{split});
+    print PYOUT "\tcall->decoder_cleanup = py_rxgen_decoder_cleanup;\n";
+    print PYOUT "\tpy_rxgen_split_client_set(call, split_callback, split_info);\n"
+       if ($func->{split});
 
     # Marshal the arguments
     print PYOUT "\n";
     print PYOUT "\trxrpc_enc(call, ", $func->{opcode}, ");\n";
     foreach my $p (@{$func->{request}}) {
-       if ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "basic" ||
-                                     $p->{elem}->{class} eq "struct")) {
-           print PYOUT "\tif (py_encode_bulk_", $p->{type}, "(call, param_", $p->{name}, ") < 0)\n";
-           print PYOUT "\t\tgoto error;\n";
-       } elsif ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" ||
-                                          $p->{elem}->{class} eq "opaque")) {
+       if ($p->{class} eq "blob") {
            my $dim = -1;
            $dim = $p->{dim} if exists $p->{dim};
            print PYOUT "\tif (py_enc_buffer(call, ¶m_", $p->{name}, ", ", $dim, ") < 0) {\n";
            print PYOUT "\t\trxrpc_terminate_call(call, EINVAL);\n";
-           print PYOUT "\t\tPy_XDECREF(split_call);\n" if ($func->{split});
            print PYOUT "\t\treturn NULL;\n";
            print PYOUT "\t}\n";
+       } elsif ($p->{class} eq "bulk") {
+           print PYOUT "\tif (py_encode_bulk_", $p->{type}, "(call, param_", $p->{name}, ") < 0)\n";
+           print PYOUT "\t\tgoto error;\n";
        } elsif ($p->{class} eq "basic" && $p->{xdr_size} == 4) {
            print PYOUT "\trxrpc_enc(call, param_", $p->{name}, ");\n";
        } elsif ($p->{class} eq "basic" && $p->{xdr_size} == 8) {
        } elsif ($p->{class} eq "struct") {
            print PYOUT "\tif (py_premarshal_", $p->{type}, "((PyObject *)param_", $p->{name}, ")) {\n";
            print PYOUT "\t\trxrpc_terminate_call(call, EINVAL);\n";
-           print PYOUT "\t\tPy_XDECREF(split_call);\n" if ($func->{split});
            print PYOUT "\t\treturn NULL;\n";
            print PYOUT "\t}\n";
            print PYOUT "\trxgen_encode_", $p->{type}, "(call, ¶m_", $p->{name}, "->x);\n";
 
     # Transmit the split data
     if ($func->{split}) {
-       print PYOUT "\tif (py_rxgen_split_client_transmit(split, split_call) < 0)\n";
+       print PYOUT "\tif (py_rxgen_split_transmit(call) < 0)\n";
        print PYOUT "\t\tgoto error_no_res;\n";
     } else {
-       print PYOUT "\tcall->more = 0;\n";
+       print PYOUT "\tcall->more_send = 0;\n";
 
        # Make the call
        print PYOUT "\n";
     # Successful return
     print PYOUT "\n";
     print PYOUT "\trxrpc_terminate_call(call, 0);\n";
-    print PYOUT "\tPy_XDECREF(split_call);\n" if ($func->{split});
     print PYOUT "\treturn res;\n";
 
     # Error cleanups
     # in the previous phase.  Variable-sized bulk arrays are split across
     # multiple phases, with the length being at the end of the first phase and
     # the data in the second.
+    #
+    # We also need to interpolate a phase to deal with decoding split-op
+    # auxiliary data.  This comes last when decoding the request and first when
+    # decoding the response.
+    #
     my @phases = ();
     my $phase = 0;
     my $have_bulk = 0;
     my $want_item = 0;
 
+    if ($func->{split} && $subname eq "response") {
+       $phase = {
+           type => "split",
+           size => "py_rxgen_split_receive(call)",
+           params => []
+       };
+       push @phases, $phase;
+       $phase = 0;
+       $have_bulk = 1;
+    }
+
     foreach my $p (@params) {
        unless ($phase) {
            $phase = { type => "flat", size => 0, params => [] };
            ) {
            $phase->{size} +=  $p->{xdr_size};
            push @{$phase->{params}}, $p;
+       } elsif ($p->{class} eq "blob") {
+           $have_bulk = 1;
+
+           # Bulk objects begin with an element count
+           $phase->{elem_count} = $phase->{size};
+           $phase->{size} +=  4;
+
+           my %pseudoparam = (
+               class   => "basic",
+               type    => "blob_size",
+               name    => $p->{name},
+               elem    => $p->{elem},
+               where   => $p->{where},
+               xdr_size => $p->{xdr_size},
+               );
+           push @{$phase->{params}}, \%pseudoparam;
+
+           # Create a new phase
+           $phase = {
+               type => "blob",
+               name => $p->{name},
+               params => [ $p ],
+               xdr_size => $p->{xdr_size},
+           };
+           push @phases, $phase;
+
+           # We don't want to be asking recvmsg() for one object at a time if
+           # they're really small.
+           $phase->{size} = $p->{xdr_size};
+           $phase = 0;
        } elsif ($p->{class} eq "bulk") {
            $have_bulk = 1;
 
            };
            push @phases, $phase;
 
-           if ($p->{elem}->{class} eq "string" ||
-               $p->{elem}->{class} eq "opaque") {
-               ;
-           } else {
-               $want_item = 1;
-           }
+           $want_item = 1;
 
            # We don't want to be asking recvmsg() for one object at a time if
            # they're really small.
        }
     }
 
+    if ($func->{split} && $subname eq "request") {
+       $phase = {
+           type => "split",
+           size => "py_rxgen_split_receive(call)",
+           params => []
+       };
+       push @phases, $phase;
+       $phase = 0;
+       $have_bulk = 1;
+    }
+
     # Function definition and arguments
     print PYOUT "\n";
     print PYOUT "int py_", $func->{name}, "_decode_", $subname, "(struct rx_call *call)\n";
     print PYOUT "{\n";
 
-    unless (@params) {
+    unless (@params || $func->{split}) {
        print PYOUT "\treturn 0;\n";
        print PYOUT "}\n";
        return;
     }
 
     # Local variables
-    print PYOUT "\tstruct py_", $func->{name}, "_", $subname, " *obj = call->decoder_private;\n";
+    print PYOUT "\tstruct py_", $func->{name}, "_", $subname, " *obj = call->decoder_private;\n"
+       if (@params);
     print PYOUT "\tPyObject *item;\n" if ($want_item);
     print PYOUT "\tunsigned phase = call->phase;\n";
     print PYOUT "\tunsigned count;\n";
     print PYOUT "\n";
     print PYOUT "select_phase:\n" if ($have_bulk);
     print PYOUT "\tcount = call->data_count;\n";
+    #print PYOUT "\tprintf(\"-- Phase %u (%u) --\\n\", phase, count);\n";
     print PYOUT "\tswitch (phase) {\n";
 
     print PYOUT "\tcase 0:\n";
        }
 
        # Determine how big bulk objects are
-       if ($phase->{type} eq "bulk") {
+       if ($phase->{type} eq "blob") {
            my $p = $phase->{params}->[0];
-           if ($p->{elem}->{class} eq "basic" ||
-               $p->{elem}->{class} eq "struct") {
-               print PYOUT "\t\tobj->x.", $p->{name}, " = PyList_New(call->bulk_count);\n";
-               print PYOUT "\t\tif (!obj->x.", $p->{name}, ")\n";
-               print PYOUT "\t\t\treturn -1;\n";
-           } elsif ($p->{elem}->{class} eq "string") {
-               print PYOUT "\t\tobj->x.", $p->{name}, " = PyUnicode_New(call->bulk_count, 255);\n";
-               print PYOUT "\t\tif (!obj->x.", $p->{name}, ")\n";
-               print PYOUT "\t\t\treturn -1;\n";
-               print PYOUT "\t\tcall->bulk_item = obj->x.", $p->{name}, ";\n";
+           if ($p->{elem}->{class} eq "string") {
+               print PYOUT "\t\tswitch (py_dec_init_string(call, &obj->x.", $p->{name}, ")) {\n";
            } elsif ($p->{elem}->{class} eq "opaque") {
                print PYOUT "\t\tobj->x.", $p->{name}, " = PyByteArray_FromStringAndSize(\"\", 0);\n";
                print PYOUT "\t\tif (!obj->x.", $p->{name}, ")\n";
                print PYOUT "\t\t\treturn -1;\n";
-               print PYOUT "\t\tif (PyByteArray_Resize(obj->x.", $p->{name}, ", call->bulk_count) == -1)\n";
+               print PYOUT "\t\tif (PyByteArray_Resize(obj->x.", $p->{name}, ", call->blob_size) == -1)\n";
+               print PYOUT "\t\t\treturn -1;\n";
+
+               print PYOUT "\t\tswitch (py_dec_init_opaque(call, obj->x.", $p->{name}, ")) {\n";
+           } else {
+               die;
+           }
+           print PYOUT "\t\tcase -1: return -1;\n";
+           print PYOUT "\t\tcase  0: goto phase_", $phix + 1, ";\n";
+           print PYOUT "\t\tcase  1: break;\n";
+           print PYOUT "\t\t}\n";
+           $phase_goto_label = $phix + 1;
+       } elsif ($phase->{type} eq "bulk") {
+           my $p = $phase->{params}->[0];
+           if ($p->{elem}->{class} eq "basic" ||
+               $p->{elem}->{class} eq "struct") {
+               print PYOUT "\t\tobj->x.", $p->{name}, " = PyList_New(call->bulk_count);\n";
+               print PYOUT "\t\tif (!obj->x.", $p->{name}, ")\n";
                print PYOUT "\t\t\treturn -1;\n";
            } else {
                die;
        }
 
        # Entry point for a phase
-       print PYOUT "\t\tcall->need_size = ", $phase->{size}, ";\n";
+       if ($phase->{type} ne "split") {
+           print PYOUT "\t\tcall->need_size = ", $phase->{size}, ";\n";
+       } else {
+           print PYOUT "\t\tif (py_rxgen_split_receive(call, 1) < 0)\n";
+           print PYOUT "\t\t\treturn -1;\n";
+           print PYOUT "\t\tif (call->need_size == 0)\n";
+           print PYOUT "\t\t\tgoto phase_", $phix + 1, ";\n";
+           $phase_goto_label = $phix + 1;
+       }
        print PYOUT "\t\tcall->phase = ", $phix, ";\n";
        print PYOUT "\tcase ", $phix, ":\n";
 
-       print PYOUT "\t\tif (count < ", $phase->{size}, ")\n";
-       print PYOUT "\t\t\treturn 1;\n";
+       if ($phase->{type} ne "split") {
+           print PYOUT "\t\tif (count < call->need_size)\n";
+           print PYOUT "\t\t\treturn 1;\n";
+       } else {
+           print PYOUT "\t\tif (call->need_size == UINT_MAX ? count == 0 : count < call->need_size) {\n";
+           #print PYOUT "\t\t\tprintf(\"NEED %u (phase %u)\\n\", call->need_size, phase);\n";
+           print PYOUT "\t\t\treturn 1;\n";
+           print PYOUT "\t\t}\n";
+       }
 
        # Unmarshal the data
        print PYOUT "\n";
        foreach my $p (@{$phase->{params}}) {
-           if ($p->{type} eq "bulk_size") {
+           if ($p->{type} eq "blob_size") {
+               print PYOUT "\t\tcall->blob_size = rxrpc_dec(call);\n";
+               next;
+           } elsif ($p->{type} eq "bulk_size") {
                print PYOUT "\t\tcall->bulk_count = rxrpc_dec(call);\n";
                next;
            }
 
-           if ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "basic" ||
-                                         $p->{elem}->{class} eq "struct")
-               ) {
+           if ($p->{class} eq "bulk") {
                if ($p->{elem}->{class} eq "struct") {
                    print PYOUT "\t\titem = py_decode_", $p->{type}, "(call);\n";
                } elsif ($p->{elem}->{xdr_size} == 4 && $p->{type} =~ /^u/) {
                print PYOUT "\t\t\treturn -1;\n";
                print PYOUT "\t\tcall->bulk_index++;\n";
 
-           } elsif ($p->{class} eq "bulk" && $p->{elem}->{class} eq "string") {
-               print PYOUT "\t\tpy_dec_string(call);\n";
-           } elsif ($p->{class} eq "bulk" && $p->{elem}->{class} eq "opaque") {
-               print PYOUT "\t\tpy_dec_opaque(call, obj->x.", $p->{name}, ");\n";
+           } elsif ($p->{class} eq "blob") {
+               if ($p->{elem}->{class} eq "string") {
+                   print PYOUT "\t\tswitch (py_dec_into_string(call)) {\n";
+               } else {
+                   print PYOUT "\t\tswitch (py_dec_into_buffer(call)) {\n";
+               }
+               print PYOUT "\t\tcase -1: return -1;\n";
+               print PYOUT "\t\tcase  0: break;\n";
+               print PYOUT "\t\tcase  1: phase = ", $phix, "; goto select_phase;\n";
+               print PYOUT "\t\t}\n";
            } elsif ($p->{class} eq "basic" && $p->{xdr_size} == 4) {
                print PYOUT "\t\tobj->x.", $p->{name}, " = (", $p->{type}, ")rxrpc_dec(call);\n";
            } elsif ($p->{class} eq "basic" && $p->{xdr_size} == 8) {
                die $p->{where}, ": Unsupported type in decode";
            }
 
-           if ($p->{class} eq "bulk") {
+           if ($p->{class} eq "blob" &&  $p->{elem}->{class} eq "string") {
+               print PYOUT "\t\tif (rxrpc_post_dec(call) < 0)\n";
+               print PYOUT "\t\t\treturn -1;\n";
+               print PYOUT "\t\tif (call->blob_offset < call->blob_size) {\n";
+               print PYOUT "\t\t\tphase = ", $phix, ";\n";
+               print PYOUT "\t\t\tgoto select_phase;\n";
+               print PYOUT "\t\t}\n";
+           } elsif ($p->{class} eq "bulk") {
                print PYOUT "\t\tif (rxrpc_post_dec(call) < 0)\n";
                print PYOUT "\t\t\treturn -1;\n";
                print PYOUT "\t\tif (call->bulk_index < call->bulk_count) {\n";
            }
        }
 
-       if ($phase->{type} ne "bulk") {
+       if ($phase->{type} eq "split") {
+           print PYOUT "\t\tswitch (py_rxgen_split_receive(call, 0)) {\n";
+           print PYOUT "\t\tcase -1: return -1;\n";
+           print PYOUT "\t\tcase  0: break;\n";
+           print PYOUT "\t\tcase  1: phase = ", $phix, "; goto select_phase;\n";
+           print PYOUT "\t\t}\n";
+           #print PYOUT "\t\tif (rxrpc_post_dec(call) < 0)\n";
+           #print PYOUT "\t\t\treturn -1;\n";
+           #print PYOUT "\t\tif (call->need_size != 0) {\n";
+           #print PYOUT "\t\t\tphase = ", $phix, ";\n";
+           #print PYOUT "\t\t\tgoto select_phase;\n";
+           #print PYOUT "\t\t}\n";
+       }
+
+       if ($phase->{type} ne "bulk" && $phase->{type} ne "blob") {
            print PYOUT "\t\tif (rxrpc_post_dec(call) < 0)\n";
            print PYOUT "\t\t\treturn -1;\n";
        }
 
 #
 # Each type is specified by a hash of the following elements:
 #
-#      class           Complexity class (basic, string, struct, array, bulk)
+#      class           Complexity class (basic, string, struct, blob, array, bulk)
 #      type            Basic/struct type (char, {u,}int{8,16,32,64}_t, opaque, struct name)
 #      elem            Ref to element type def (if array/bulk)
 #      multi           1 if array or bulk type, 0 otherwise
 
     my %combined = %{$type};
 
+    if (exists $flags->{class} && $flags->{class} eq "bulk") {
+       if ($type->{class} eq "string" ||
+           $type->{class} eq "opaque") {
+           $flags->{class} = "blob";
+       }
+    }
+
     if (exists $flags->{class} &&
-       ($flags->{class} eq "bulk" ||
+       ($flags->{class} eq "blob" ||
+        $flags->{class} eq "bulk" ||
         $flags->{class} eq "array")) {
        die $where, ": Typedef'ing array/bulk as array/bulk not supported\n"
            if ($type->{multi});
            if ($line =~ /([a-zA-Z_][a-zA-Z0-9_]*) ([a-zA-Z_][a-zA-Z0-9_]*);/) {
                my %member = %{look_up_type($1)};
                die $where, ": Don't support bulk constructs in structs\n"
-                   if ($member{class} eq "bulk");
+                   if ($member{class} eq "bulk" || $member{class} eq "blob");
                $member{name} = $2;
                $member{where} = $where;
                push $struct->{members}, \%member;
                if ($bulk_dim) {
                    die $where, ": Bulk-of-bulk parameters not supported\n"
                        if ($param{class} eq "bulk");
-                   $param{class} = "bulk";
+                   if ($type->{class} eq "string" ||
+                       $type->{class} eq "opaque") {
+                       $param{class} = "blob";
+                   } else {
+                       $param{class} = "bulk";
+                   }
                    $param{elem} = $type;
                    $param{dim} = $bulk_dim unless $bulk_dim eq -1;
                }
            ;
        } elsif ($p->{class} eq "struct") {
            die unless (exists $p->{xdr_size});
+       } elsif ($p->{class} eq "blob") {
+           die $p->{where}, ": No element type" unless (exists $p->{elem});
+           if (exists $p->{dim}) {
+               die $where, ": Missing constant ", $p->{dim} unless exists $constants{$p->{dim}};
+           }
        } elsif ($p->{class} eq "bulk") {
            die $p->{where}, ": No element type" unless (exists $p->{elem});
            die $p->{where}, ": Element has no XDR size" unless (exists $p->{elem}->{xdr_size});
 
--- /dev/null
+#
+# AFS Server management toolkit: Server log fetcher
+# -*- coding: utf-8 -*-
+#
+
+__copyright__ = """
+Copyright (C) 2014 Red Hat, Inc. All Rights Reserved.
+Written by David Howells (dhowells@redhat.com)
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public Licence version 2 as
+published by the Free Software Foundation.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+GNU General Public Licence for more details.
+
+You should have received a copy of the GNU General Public Licence
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+"""
+
+from afs.exception import AFSException
+from afs.argparse import *
+from afs.lib.output import *
+import kafs
+import os
+
+help = "Print a server process's log file"
+
+command_arguments = [
+    [ "server",         get_bosserver,          "rs",         "<machine name>" ],
+    [ "file",           get_file_name,          "rs",         "<log file to examine>" ],
+    [ "cell",           get_cell,               "os",         "<cell name>" ],
+    [ "noauth",         get_auth,               "fn" ],
+    [ "localauth",      get_auth,               "fn" ],
+    [ "verbose",        get_verbose,            "fn" ],
+    [ "encrypt",        get_dummy,              "fn" ],
+]
+
+cant_combine_arguments = [
+    ( "cell",           "localauth" ),
+    ( "noauth",         "localauth" ),
+]
+
+argument_size_limits = {
+    "file"              : kafs.BOZO_BSSIZE,
+}
+
+description = r"""
+Print a server process's log file
+"""
+
+class split_handler:
+    def __init__(self):
+        pass
+
+    def transmit(self, split_info):
+        return None
+
+    # Receive state machine.  Returns True if more data is required to be read
+    # or another state must be transited to and False if the operation is now
+    # complete.
+    #
+    def receive(self, split_info):
+        # We want to receive everything we can - the entire response phase
+        # belongs to the split handler
+        if split_info.state == 0:
+            split_info.will_recv_all()
+            split_info.state = 1
+            return True
+
+        if split_info.state == 1:
+            avail = split_info.data_available()
+            if avail == None:
+                # Last byte read
+                return False
+            if avail == 0:
+                split_info.will_recv_all()
+                return True
+
+            if avail > 4096:
+                avail = 4096
+            buf = bytearray(avail)
+            split_info.target = buf
+            split_info.state = 2
+
+            # Request reception start.  This function will be reentered with
+            # phase incremented when it's done.  receive_failed() will be
+            # called instead upon failure.
+            return split_info.begin_recv(buf, False)
+
+        if split_info.state == 3:
+            buf = split_info.target
+            split_info.target = None
+
+            # Strip any terminal NUL chars
+            buf = buf.rstrip(b"\0")
+            output_raw(buf)
+            del buf
+
+            split_info.state = 1
+            return True
+
+        raise AFSException("Unexpected receive phase ", split_info.phase,
+                           " in getlog() split.receive()")
+
+    def receive_failed(self, split_info):
+        print("Receive failed in phase", split_info.phase)
+        split_info.target = None
+
+def main(params):
+    cell = params["cell"]
+    bos_conn = cell.open_bos_server(params["server"], params)
+
+    split = split_handler()
+
+    output("Fetching log file '", params["file"], "'...\n")
+    output_flush()
+    ret = kafs.BOZO_GetLog(bos_conn, params["file"], split)
 
         self.__size = size
         self.__pos = 0
 
-    def client_transmit(self, split_call):
+    def transmit(self, split_info):
         while self.__pos < self.__size:
             verbose("--- XMIT ", self.__pos, "/", self.__size, "\n")
             size = self.__size - self.__pos
             buf = self.__fd.read(size)
             if len(buf) < size:
                 raise IOError("Short read on file " + self.__name)
-            split_call.send(buf, self.__pos < self.__size)
+            split_info.send(buf, self.__pos < self.__size)
         return True
 
-    def client_receive(self, split_call):
-        return True
+    def receive(self, split_info, inited):
+        return None
 
 def main(params):
     cell = params["cell"]
 
     for i in args:
         sys.stdout.write(str(i))
 
+def output_raw(*args):
+    for i in args:
+        sys.stdout.buffer.write(i)
+
+def output_flush():
+    sys.stdout.flush()
+
 def outputf(formatstr, *args):
     sys.stdout.write(formatstr.format(*args))
 
 
     except KeyboardInterrupt:
         sys.exit(1)
     except SystemExit:
-        pass
+        sys.exit(0)
     except:
         print('Unhandled exception:', file=sys.stderr)
         traceback.print_exc()