]> www.infradead.org Git - users/dhowells/kafs-utils.git/commitdiff
Implement "bos getlog" and reengineer the decode path somewhat
authorDavid Howells <dhowells@redhat.com>
Thu, 1 May 2014 14:53:03 +0000 (15:53 +0100)
committerDavid Howells <dhowells@redhat.com>
Thu, 1 May 2014 14:53:03 +0000 (15:53 +0100)
Implement the "bos getlog" subcommand - which requires making the receive side
of split RPC function work.  To this end, the following reengineering has been
done to the decode side:

 (1) Lift the requirement that the Rx buffer cursor be 4-byte aligned as
     BOZO.GetLog() doesn't align the log data it returns.

 (2) Permit a request for an unknown - and possibly zero - amount of data to be
     set in call->need_size.  BOZO.GetLog() doesn't tell you in advance how
     much data it will return: the data stops when the Rx receive phase ends
     with LAST_PACKET being flagged.

 (3) Separate blob decoding (strings & opaques) from bulk decoding in the
     member variables of the rx_call struct, where a bulk decode is now a
     sequence of blobs, structs or ints.

 (4) Handle blob decoding asynchronously, where the buffer into which a blob is
     being written may not represent contiguous memory.  This is done by
     setting up with an init function, called once, and a decode-into function,
     called repeatedly whilst it returns 1.  The function is only called when
     there is sufficient data in the receive buffers.

 (5) Decode padding asynchronously by working out up front for a blob how much
     padding it requires and then decoding it as its own blob at the end using
     a special source buffer as a marker to switch processing.

 (6) Perform split reception by adding additional states within the decode
     state machine to call out to the handler functions.

The receive() method of the split_handler class provided to a split RPC
function then implements its own state machine on top of the decoder state
machine.

Further:

 (*) rxrpc_recv_data() now reports ENODATA rather than EMSGSIZE if the receive
     phase ends with short data.

 (*) MSG_MORE handling has separate rx_call struct members for the send and
     receive phase now to avoid confusion.

 (*) rxrpc_enc/dec_slow() now use rxrpc_enc/dec_blob() to avoid duplicating a
     lot of code.

 (*) rxrpc_enc/dec() now also go to the slow path if the cursor is misaligned,
     which may mean the data to be read is split across buffers.

 (*) The rx_call struct now has a pointer to a cleanup function to clean up the
     decoder state at the end to deal with aborted calls.

 (*) rxrpc_post_dec() reduces call->need_size as well as call->data_count as we
     don't want to be waiting for N bytes to turn up for a size-N blob if we
     have already received N-1 bytes of it and only need one more byte.

     This does mean that call->need_size may need resetting more often.

 (*) Added an output_raw() python output method to permit bytearray objects to
     be printed.  Conversion to a string means that control characters and
     quotes get escaped (eg. a newline char gets converted to \n).

 (*) Catching the SystemExit exception should not produce an error due to ret
     not existing as a variable.

Signed-off-by: David Howells <dhowells@redhat.com>
13 files changed:
af_rxrpc.c
py_rxgen.c
py_rxgen.h
py_rxsplit.c
rxgen.h
rxgen/emit_c_sync_funcs.pm
rxgen/emit_py_module.pm
rxgen/emit_py_sync_funcs.pm
rxgen/rxgen.pl
suite/commands/bos/getlog.py [new file with mode: 0644]
suite/commands/bos/install.py
suite/lib/output.py
suite/main.py

index 69ce2fcca2ca4681586dd9dd25158ede0a68881b..91a671c8a170f9fd0efb5b74d0286f01c2a6b628 100644 (file)
@@ -16,6 +16,7 @@
 #include <unistd.h>
 #include <poll.h>
 #include <errno.h>
+#include <limits.h>
 #include <sys/socket.h>
 #include "af_rxrpc.h"
 #include "rxgen.h"
@@ -26,6 +27,8 @@
 
 #define debug(fmt, ...) do { if (0) printf(fmt, ## __VA_ARGS__); } while (0)
 
+uint32_t rxgen_dec_padding_sink;
+
 /*
  * dump the control messages
  */
@@ -283,7 +286,8 @@ struct rx_call *rxrpc_alloc_call(struct rx_connection *z_conn,
 
        call->magic = RXGEN_CALL_MAGIC;
        call->conn = z_conn;
-       call->more = 1;
+       call->more_send = true;
+       call->more_recv = true;
        call->buffer_head = buf;
        call->buffer_tail = buf;
        call->data_start = data;
@@ -393,7 +397,7 @@ more_to_send:
                iov[ioc].iov_base = cursor->buf + io_cursor;
                iov[ioc].iov_len = end - io_cursor;
                if (cursor == call->buffer_tail) {
-                       if (!call->more)
+                       if (!call->more_send)
                                more = 0;
                        ioc++;
                        break;
@@ -445,10 +449,10 @@ more_to_send:
 
        if (call->data_count > 0)
                goto more_to_send;
-       if (call->data_count == 0 && !call->more)
+       if (call->data_count == 0 && !call->more_send)
                call->state++;
 
-       if (!call->more) {
+       if (!call->more_send) {
                if (call->state == rx_call_cl_encoding_params ||
                    call->state == rx_call_sv_encoding_response)
                        call->state++;
@@ -494,7 +498,7 @@ int rxrpc_recv_data(struct rx_connection *z_conn, bool nowait)
        msg.msg_flags   = 0;
 
        ret = recvmsg(z_conn->fd, &msg, MSG_PEEK | (nowait ? MSG_DONTWAIT : 0));
-       debug("RECVMSG: %d\n", ret);
+       debug("RECVMSG PEEK: %d\n", ret);
        if (ret == -1)
                return -1;
 
@@ -571,7 +575,7 @@ int rxrpc_recv_data(struct rx_connection *z_conn, bool nowait)
        if (ret == -1)
                return -1;
 
-       debug("RECV: %d [fl:%d]\n", ret, msg.msg_flags);
+       debug("RECV: %d [fl:%x]\n", ret, msg.msg_flags);
        debug("CMSG: %zu\n", msg.msg_controllen);
        debug("IOV: %zu [0]=%zu\n", msg.msg_iovlen, iov[0].iov_len);
 
@@ -607,14 +611,14 @@ int rxrpc_recv_data(struct rx_connection *z_conn, bool nowait)
        rxrpc_check_call(call);
 
        for (cursor = call->buffer_head; cursor; cursor = cursor->next)
-               debug("Recv buf=%p data=%p ioc=%u\n",
+               debug("RecvQ buf=%p data=%p iocur=%u\n",
                      cursor, cursor->buf, cursor->io_cursor);
 
        /* Process the metadata */
        if (msg.msg_flags & MSG_EOR)
                call->known_to_kernel = 0;
        if (!(msg.msg_flags & MSG_MORE))
-               call->more = 0;
+               call->more_recv = false;
 
        for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
                unsigned char *p;
@@ -690,10 +694,16 @@ loop:
        case rx_call_cl_decoding_response:
        case rx_call_sv_decoding_opcode:
        case rx_call_sv_decoding_params:
-               if (call->data_count < call->need_size) {
+               if (call->need_size == UINT_MAX) {
+                       /* A split mode call wants everything as it comes in,
+                        * and is willing to settle for nothing also.
+                        */
+                       if (call->data_count <= 0 && call->more_recv)
+                               return 0;
+               } else if (call->data_count < call->need_size) {
                        if (!(msg.msg_flags & MSG_MORE)) {
                                /* Short data */
-                               call->error_code = EMSGSIZE;
+                               call->error_code = ENODATA;
                                rxrpc_abort_call(call, 1);
                        }
                        return 0;
@@ -766,6 +776,7 @@ void rxrpc_abort_call(struct rx_call *call, uint32_t abort_code)
        unsigned char control[128];
 
        if (call->known_to_kernel) {
+               memset(control, 0, sizeof(control));
                ctrllen = 0;
                RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)call);
                RXRPC_ADD_ABORT(control, ctrllen, abort_code);
@@ -802,6 +813,8 @@ void rxrpc_terminate_call(struct rx_call *call, uint32_t abort_code)
                free(cursor->buf);
                free(cursor);
        }
+       if (call->decoder_cleanup)
+               call->decoder_cleanup(call);
        free(call);
 }
 
@@ -841,96 +854,6 @@ int rxrpc_run_sync_call(struct rx_call *call)
        }
 }
 
-/*
- * Slow path for decoding from data read from the buffer list.
- */
-void rxrpc_enc_slow(struct rx_call *call, net_xdr_t data)
-{
-       struct rx_buf *cursor, *new;
-       uint8_t *buf;
-
-       rxrpc_check_call(call);
-
-       if (call->error_code)
-               return;
-       if (call->data_cursor != call->data_stop)
-               abort();
-       rxrpc_post_enc(call);
-
-       new = calloc(1, sizeof(struct rx_buf));
-       if (!new)
-               goto handle_oom;
-       new->buf = buf = malloc(RXGEN_BUFFER_SIZE);
-       if (!buf)
-               goto handle_oom_buf;
-       new->magic = RXGEN_BUF_MAGIC;
-
-       *(net_xdr_t *)buf = data;
-
-       cursor = call->buffer_tail;
-       cursor->next = new;
-       call->data_cursor = call->data_start = buf + sizeof(net_xdr_t);
-       call->data_stop = buf + RXGEN_BUFFER_SIZE;
-       call->buffer_space += RXGEN_BUFFER_SIZE - sizeof(net_xdr_t);
-       call->buffer_tail = new;
-       return;
-
-handle_oom_buf:
-       new->magic = RXGEN_BUF_DEAD;
-       free(new);
-handle_oom:
-       call->error_code = ENOMEM;
-}
-
-/*
- * Slow path for decoding from data read from the buffer list.
- */
-uint32_t rxrpc_dec_slow(struct rx_call *call)
-{
-       struct rx_buf *cursor, *spent;
-       net_xdr_t x;
-       unsigned count;
-       uint8_t *new_stop;
-
-       rxrpc_check_call(call);
-       rxrpc_post_dec(call);
-
-       /* More data may have come into a partially filled buffer from which we
-        * previously read.
-        */
-       cursor = call->buffer_head;
-       count = cursor->io_cursor & ~3;
-       new_stop = cursor->buf + count;
-       if (call->data_stop >= new_stop) {
-               /* This buffer must then be completely used as we're required to check
-                * amount received before reading it.
-                */
-               if (new_stop != cursor->buf + RXGEN_BUFFER_SIZE)
-                       abort(); /* Didn't completely consume a buffer */
-               if (call->buffer_tail == cursor)
-                       abort(); /* Unexpectedly out of data */
-
-               /* Move to the next buffer */
-               spent = cursor;
-               cursor = cursor->next;
-               call->buffer_head = cursor;
-
-               call->data_cursor = call->data_start = cursor->buf;
-               count = cursor->io_cursor & ~3;
-               if (count == 0)
-                       abort();
-               new_stop = cursor->buf + count;
-               spent->magic = RXGEN_BUF_DEAD;
-               free(spent->buf);
-               free(spent);
-       }
-
-       call->data_stop = new_stop;
-       x = *(net_xdr_t *)call->data_cursor;
-       call->data_cursor += sizeof(x);
-       return ntohl(x);
-}
-
 /*
  * Discard excess received data
  */
@@ -965,7 +888,7 @@ int rxgen_dec_discard_excess(struct rx_call *call)
 /*
  * Encode a string of bytes
  */
-int rxrpc_enc_bytes(struct rx_call *call, const void *data, size_t size)
+int rxrpc_enc_blob(struct rx_call *call, const void *data, size_t size)
 {
        struct rx_buf *cursor, *new;
        uint8_t *buf;
@@ -1020,35 +943,65 @@ handle_oom:
 }
 
 /*
- * Decode a string of bytes
+ * Slow path for decoding from data read from the buffer list.
  */
-void rxrpc_dec_bytes(struct rx_call *call)
+void rxrpc_enc_slow(struct rx_call *call, net_xdr_t data)
 {
-       struct rx_buf *cursor, *spent;
-       unsigned needed, count;
-       uint8_t *new_stop;
+       rxrpc_enc_blob(call, &data, sizeof(data));
+}
+
+/*
+ * Decode a blob.
+ *
+ * This works progressively and so may need to be called multiple times for any
+ * particular set.
+ *
+ * call->blob_size indicates how many bytes we need to extract in total and
+ * call->blob_offset indicates how many we have done so far.  blob_offset is
+ * updated before returning.
+ */
+void rxrpc_dec_blob(struct rx_call *call)
+{
+       unsigned needed, segment;
 
        rxrpc_check_call(call);
        rxrpc_post_dec(call);
 
-       needed = call->bulk_count - call->bulk_index;
-       count = call->data_stop - call->data_cursor;
-       if (count > 0) {
-               if (count > needed)
-                       count = needed;
-               memcpy(call->bulk_item + call->bulk_index, call->data_cursor, count);
-               call->bulk_index += count;
-               call->data_cursor += count;
-               if (call->bulk_index >= call->bulk_count)
+       needed = call->blob_size - call->blob_offset;
+       segment = call->data_stop - call->data_cursor;
+       debug("DEC BLOB dc=%u bsize=%u seg=%u\n", call->data_count, needed, segment);
+
+       if (segment > 0) {
+               if (segment > needed)
+                       segment = needed;
+
+               debug("DEC BLOB copy %u\n", segment);
+               memcpy(call->blob + call->blob_offset, call->data_cursor, segment);
+               call->blob_decoded += segment;
+               call->blob_offset += segment;
+               call->data_cursor += segment;
+               if (call->blob_offset >= call->blob_size)
                        return;
        }
 
+       rxrpc_dec_advance_buffer(call);
+}
+
+/*
+ * Progress the buffer chain during decoding.
+ */
+void rxrpc_dec_advance_buffer(struct rx_call *call)
+{
+       struct rx_buf *cursor, *spent;
+       unsigned segment;
+       uint8_t *new_stop;
+
        /* More data may have come into a partially filled buffer from which we
         * previously read.
         */
        cursor = call->buffer_head;
-       count = cursor->io_cursor & ~3;
-       new_stop = cursor->buf + count;
+       segment = cursor->io_cursor;
+       new_stop = cursor->buf + segment;
        if (call->data_stop >= new_stop) {
                /* This buffer must then be completely used as we're required to check
                 * amount received before reading it.
@@ -1059,15 +1012,16 @@ void rxrpc_dec_bytes(struct rx_call *call)
                        abort(); /* Unexpectedly out of data */
 
                /* Move to the next buffer */
+               rxrpc_post_dec(call);
                spent = cursor;
                cursor = cursor->next;
                call->buffer_head = cursor;
 
                call->data_cursor = call->data_start = cursor->buf;
-               count = cursor->io_cursor & ~3;
-               if (count == 0)
+               segment = cursor->io_cursor;
+               if (segment == 0)
                        abort();
-               new_stop = cursor->buf + count;
+               new_stop = cursor->buf + segment;
 
                spent->magic = RXGEN_BUF_DEAD;
                free(spent->buf);
@@ -1077,3 +1031,33 @@ void rxrpc_dec_bytes(struct rx_call *call)
        call->data_stop = new_stop;
        rxrpc_check_call(call);
 }
+
+/*
+ * Slow path for decoding a 4-byte integer read from the buffer list.
+ */
+uint32_t rxrpc_dec_slow(struct rx_call *call)
+{
+       net_xdr_t x;
+       uint32_t ret;
+
+       debug("DEC SLOW %lu %ld\n",
+             (unsigned long)call->data_cursor & 3,
+             call->data_cursor - call->data_stop);
+
+       call->blob_size = 4;
+       call->blob_offset = 0;
+       call->blob = &x;
+
+       /* We should only come here if there is sufficient data in the
+        * buffers for us to complete all the copy operation(s).
+        */
+       if (call->data_count < call->blob_size)
+               abort();
+
+       while (call->blob_offset < call->blob_size)
+               rxrpc_dec_blob(call);
+
+       ret = ntohl(x);
+       debug("DEC SLOW = %x\n", ret);
+       return ret;
+}
index 3d733972504d8daf510c927902c9d5b551906b04..85ed6594c8dc119ec91373dba7bed39579c84f86 100644 (file)
 #include "afs_py.h"
 #include "rxgen.h"
 
+#define debug(fmt, ...) do { if (0) printf(fmt, ## __VA_ARGS__); } while (0)
+
+struct py_dec_index {
+       unsigned long val;
+       void *ptr;
+};
+
+struct py_dec_manager {
+       Py_buffer view;
+       struct py_dec_index top;
+       struct py_dec_index indices[];
+};
+
 PyObject *py_rxgen_get_struct(const void *p, PyObject **cache,
                              PyObject *(*data_to_type)(const void *elem))
 {
@@ -682,90 +695,24 @@ int py_rxgen_initialise_members(PyObject *obj, PyObject *kw)
        return 0;
 }
 
-/*
- * Decode a string of bytes into a preallocated unicode string python object.
- */
-void py_dec_string(struct rx_call *call)
-{
-       PyObject *str = call->bulk_item;
-       struct rx_buf *cursor, *spent;
-       unsigned needed, count;
-       uint8_t *new_stop;
-
-       if (PyUnicode_KIND(str) != PyUnicode_1BYTE_KIND)
-               abort();
-
-       rxrpc_post_dec(call);
-
-       needed = call->bulk_count - call->bulk_index;
-       count = call->data_stop - call->data_cursor;
-       if (count > 0) {
-               if (count > needed)
-                       count = needed;
-               memcpy(PyUnicode_DATA(str) + call->bulk_index, call->data_cursor, count);
-               call->bulk_index += count;
-               call->data_cursor += count;
-               rxrpc_dec_align(call);
-               if (call->bulk_count <= call->bulk_index)
-                       return;
-               
-       }
-
-       /* More data may have come into a partially filled buffer from which we
-        * previously read.
-        */
-       cursor = call->buffer_head;
-       count = cursor->io_cursor & ~3;
-       new_stop = cursor->buf + count;
-       if (call->data_stop >= new_stop) {
-               /* This buffer must then be completely used as we're required to check
-                * amount received before reading it.
-                */
-               if (new_stop != cursor->buf + RXGEN_BUFFER_SIZE)
-                       abort(); /* Didn't completely consume a buffer */
-               if (call->buffer_tail == cursor)
-                       abort(); /* Unexpectedly out of data */
-
-               /* Move to the next buffer */
-               spent = cursor;
-               cursor = cursor->next;
-               call->buffer_head = cursor;
-
-               call->data_cursor = call->data_start = cursor->buf;
-               count = cursor->io_cursor & ~3;
-               if (count == 0)
-                       abort();
-               new_stop = cursor->buf + count;
-
-               free(spent->buf);
-               free(spent);
-       }
-
-       call->data_stop = new_stop;
-}
-
 /*
  * Recursively encode a standard C array.
  */
 static int py_enc_c_array(struct rx_call *call,
                          const void *data, int dim,
                          const Py_ssize_t *dim_size,
-                         Py_ssize_t itemsize,
-                         int align)
+                         Py_ssize_t itemsize)
 {
        if (dim == 1) {
                /* Data subarray */
-               rxrpc_enc_bytes(call, data, *dim_size * itemsize);
-               if (align)
-                       rxrpc_enc_align(call);
+               rxrpc_enc_blob(call, data, *dim_size * itemsize);
                return rxrpc_post_enc(call);
        } else {
                /* Pointer subarray */
                const void *const *ptrs = data;
                Py_ssize_t i;
                for (i = 0; i < *dim_size; i++)
-                       if (py_enc_c_array(call, ptrs[i], dim - 1, dim_size + 1, itemsize,
-                                          align) < 0)
+                       if (py_enc_c_array(call, ptrs[i], dim - 1, dim_size + 1, itemsize) < 0)
                                return -1;
                return 0;
        }
@@ -784,7 +731,7 @@ static int py_enc_numpy_array(struct rx_call *call,
 
        if (dim == 0)
                /* Single data item */
-               return rxrpc_enc_bytes(call, data, itemsize);
+               return rxrpc_enc_blob(call, data, itemsize);
 
        for (i = 0; i < *dim_size; i++) {
                if (py_enc_numpy_array(call, data + i * *dim_stride,
@@ -810,7 +757,7 @@ static int py_enc_pil_array(struct rx_call *call,
 
        if (dim == 0)
                /* Single data item */
-               return rxrpc_enc_bytes(call, data, itemsize);
+               return rxrpc_enc_blob(call, data, itemsize);
 
        for (i = 0; i < *dim_size; i++) {
                const void *ptr = data + i * *dim_stride;
@@ -832,22 +779,21 @@ static int py_enc_pil_array(struct rx_call *call,
  * Encode the just the contents of a python buffer view, without length or
  * realignment.
  */
-int py_enc_buffer_raw(struct rx_call *call, Py_buffer *view, size_t dim, int align)
+int py_enc_buffer_raw(struct rx_call *call, Py_buffer *view, size_t dim)
 {
        if (call->error_code)
                return -1;
 
        if (view->ndim == 0 || (view->ndim == 1 && !view->shape)) {
                /* Simple scalar array */
-               if (rxrpc_enc_bytes(call, view->buf, view->len) < 0)
+               if (rxrpc_enc_blob(call, view->buf, view->len) < 0)
                        return -1;
                return rxrpc_post_enc(call);
        }
 
        if (!view->strides)
                /* Standard C array */
-               return py_enc_c_array(call, view->buf, view->ndim, view->shape, view->itemsize,
-                                     align);
+               return py_enc_c_array(call, view->buf, view->ndim, view->shape, view->itemsize);
 
        if (!view->suboffsets)
                return py_enc_numpy_array(call, view->buf, view->ndim, view->shape, view->strides,
@@ -863,17 +809,18 @@ int py_enc_buffer_raw(struct rx_call *call, Py_buffer *view, size_t dim, int ali
  */
 int py_enc_buffer(struct rx_call *call, Py_buffer *view, size_t dim)
 {
+       static unsigned zero;
        int i;
 
        if (call->error_code)
                return -1;
 
        if (0) {
-               printf("PEBUF: l=%zu isz=%zd {", view->len, view->itemsize);
+               debug("PEBUF: l=%zu isz=%zd {", view->len, view->itemsize);
                if (view->shape)
                        for (i = 0; i < view->ndim; i++)
-                               printf(" %zd", view->shape[i]);
-               printf(" }\n");
+                               debug(" %zd", view->shape[i]);
+               debug(" }\n");
        }
 
        if (view->len > dim) {
@@ -882,167 +829,386 @@ int py_enc_buffer(struct rx_call *call, Py_buffer *view, size_t dim)
        }
 
        rxrpc_enc(call, view->len);
-       if (py_enc_buffer_raw(call, view, dim, true) == -1)
+       if (py_enc_buffer_raw(call, view, dim) == -1)
                return -1;
 
-       rxrpc_enc_align(call);
+       if (view->len & 3)
+               rxrpc_enc_blob(call, &zero, 4 - (view->len & 3));
        return rxrpc_post_enc(call);
 }
 
 /*
- * Recursively decode a standard C array.
+ * Recursively decode into a standard C array.
  */
-static int py_dec_c_array(struct rx_call *call,
-                         void *data, int dim,
-                         const Py_ssize_t *dim_size,
-                         Py_ssize_t itemsize)
+static int py_dec_c_array(struct rx_call *call, struct py_dec_manager *manager)
 {
-       if (dim == 1) {
-               /* Data subarray */
-               call->bulk_item = data;
-               call->bulk_count = *dim_size * itemsize;
-               call->bulk_index = 0;
-               rxrpc_dec_bytes(call);
-               return rxrpc_post_dec(call);
-       } else {
-               /* Pointer subarray */
-               void **ptrs = data;
-               Py_ssize_t i;
-               for (i = 0; i < *dim_size; i++)
-                       if (py_dec_c_array(call, ptrs[i], dim - 1, dim_size + 1, itemsize) < 0)
-                               return -1;
-               return 0;
+       unsigned long val;
+       int i;
+
+       /* First of all, we finish off the blob we're currently decoding
+        * into.
+        */
+       if (call->blob_size < manager->view.itemsize) {
+               rxrpc_dec_blob(call);
+               if (rxrpc_post_dec(call) < 0)
+                       return -1;
+               if (call->blob_size < manager->view.itemsize)
+                       goto need_more;
+       }
+
+       /* Now we increment the per-dimension counters, overflowing and
+        * wrapping when a counter reaches the limit.  When the outermost
+        * wraps, we are done.
+        */
+       for (i = manager->view.ndim - 1; i >= 0; i--) {
+               val = manager->indices[i].val;
+               if (val + 1 < manager->view.shape[i]) {
+                       manager->indices[i].val = val + 1;
+                       goto incremented;
+               }
+               manager->indices[i].val = 0;
        }
+
+       return 0; /* Complete */
+
+incremented:
+       /* Recalculate the cached pointers */
+       for (; i <= manager->view.ndim - 1; i++) {
+               void **ptrs = manager->indices[i - 1].ptr;
+               void *ptr = ptrs[manager->indices[i].val];
+               manager->indices[i].ptr = ptr;
+       }
+
+       call->blob_offset = 0;
+       call->blob = manager->indices[manager->view.ndim - 1].ptr;
+need_more:
+       return 1;
 }
 
 /*
- * Recursively decode a NumPy-style array.
+ * Recursively decode into a NumPy-style array.
  */
-static int py_dec_numpy_array(struct rx_call *call,
-                             void *data, int dim,
-                             const Py_ssize_t *dim_size,
-                             const Py_ssize_t *dim_stride,
-                             Py_ssize_t itemsize)
+static int py_dec_numpy_array(struct rx_call *call, struct py_dec_manager *manager)
 {
+       unsigned long val;
        int i;
 
-       if (dim == 0) {
-               /* Single data item */
-               call->bulk_item = data;
-               call->bulk_count = itemsize;
-               call->bulk_index = 0;
-               rxrpc_dec_bytes(call);
-               return 0;
+       /* First of all, we finish off the blob we're currently decoding
+        * into.
+        */
+       if (call->blob_size < manager->view.itemsize) {
+               rxrpc_dec_blob(call);
+               if (rxrpc_post_dec(call) < 0)
+                       return -1;
+               if (call->blob_size < manager->view.itemsize)
+                       goto need_more;
        }
 
-       for (i = 0; i < *dim_size; i++) {
-               if (py_dec_numpy_array(call, data + i * *dim_stride,
-                                      dim - 1, dim_size + 1, dim_stride + 1, itemsize) < 0)
-                       return -1;
-               if (dim == 1 && rxrpc_post_dec(call) < 0)
-                       return -1;
+       /* Now we increment the per-dimension counters, overflowing and
+        * wrapping when a counter reaches the limit.  When the outermost
+        * wraps, we are done.
+        */
+       for (i = manager->view.ndim - 1; i >= 0; i--) {
+               val = manager->indices[i].val;
+               if (val + 1 < manager->view.shape[i]) {
+                       manager->indices[i].val = val + 1;
+                       goto incremented;
+               }
+               manager->indices[i].val = 0;
        }
-       return 0;
+
+       return 0; /* Complete */
+
+incremented:
+       /* Recalculate the cached pointers */
+       for (; i <= manager->view.ndim - 1; i++) {
+               void *ptr = manager->indices[i - 1].ptr;
+               ptr += manager->indices[i].val * manager->view.strides[i];
+               manager->indices[i].ptr = ptr;
+       }
+
+       call->blob_offset = 0;
+       call->blob = manager->indices[manager->view.ndim - 1].ptr;
+need_more:
+       return 1;
 }
 
 /*
- * Recursively decode a Python Imaging Library (PIL)-style array.
+ * Recursively decode into a Python Imaging Library (PIL)-style array.
  */
-static int py_dec_pil_array(struct rx_call *call,
-                           void *data, int dim,
-                           const Py_ssize_t *dim_size,
-                           const Py_ssize_t *dim_stride,
-                           const Py_ssize_t *dim_suboffset,
-                           Py_ssize_t itemsize)
+static int py_dec_pil_array(struct rx_call *call, struct py_dec_manager *manager)
 {
+       unsigned long val;
        int i;
 
-       if (dim == 0) {
-               /* Single data item */
-               call->bulk_item = data;
-               call->bulk_count = itemsize;
-               call->bulk_index = 0;
-               rxrpc_dec_bytes(call);
-               return 0;
+       /* First of all, we finish off the blob we're currently decoding
+        * into.
+        */
+       if (call->blob_size < manager->view.itemsize) {
+               rxrpc_dec_blob(call);
+               if (rxrpc_post_dec(call) < 0)
+                       return -1;
+               if (call->blob_size < manager->view.itemsize)
+                       goto need_more;
        }
 
-       for (i = 0; i < *dim_size; i++) {
-               void *ptr = data + i * *dim_stride;
-               if (*dim_suboffset >= 0) {
+       /* Now we increment the per-dimension counters, overflowing and
+        * wrapping when a counter reaches the limit.  When the outermost
+        * wraps, we are done.
+        */
+       for (i = manager->view.ndim - 1; i >= 0; i--) {
+               val = manager->indices[i].val;
+               if (val + 1 < manager->view.shape[i]) {
+                       manager->indices[i].val = val + 1;
+                       goto incremented;
+               }
+               manager->indices[i].val = 0;
+       }
+
+       return 0; /* Complete */
+
+incremented:
+       /* Recalculate the cached pointers */
+       for (; i <= manager->view.ndim - 1; i++) {
+               void *ptr = manager->indices[i - 1].ptr;
+               ptr += manager->indices[i].val * manager->view.strides[i];
+               if (manager->view.suboffsets[i] >= 0) {
                        ptr = *((void *const *)ptr);
-                       ptr += *dim_suboffset;
+                       ptr += manager->view.suboffsets[i];
                }
-               if (py_dec_pil_array(call, ptr,
-                                    dim - 1, dim_size + 1, dim_stride + 1, dim_suboffset + 1,
-                                    itemsize) < 0)
-                       return -1;
-               if (dim == 1 && rxrpc_post_dec(call) < 0)
-                       return -1;
+               manager->indices[i].ptr = ptr;
        }
-       return 0;
+
+       call->blob_offset = 0;
+       call->blob = manager->indices[manager->view.ndim - 1].ptr;
+need_more:
+       return 1;
 }
 
 /*
  * Decode the contents of an opaque type
  */
-int py_dec_opaque(struct rx_call *call, PyObject *obj)
+int py_dec_into_buffer(struct rx_call *call)
 {
-       Py_buffer view;
-       int i;
+       struct py_dec_manager *manager = call->decoder_manager;
+       int ret;
 
        if (call->error_code)
                return -1;
 
-       if (PyObject_GetBuffer(obj, &view, PyBUF_FULL) < 0)
-               return -1;
+       if (call->blob == &rxgen_dec_padding_sink) {
+               rxrpc_dec_blob(call);
+               if (rxrpc_post_dec(call) == -1)
+                       return -1;
+               return (call->blob_offset == call->blob_size) ? 0 : 1;
+       }
 
-       if (0) {
-               printf("PEBUF: l=%zu isz=%zd {", view.len, view.itemsize);
-               if (view.shape)
-                       for (i = 0; i < view.ndim; i++)
-                               printf(" %zd", view.shape[i]);
-               printf(" }\n");
+       if (manager->view.ndim == 0 || (manager->view.ndim == 1 && !manager->view.shape)) {
+               /* Single scalar */
+               rxrpc_dec_blob(call);
+               if (rxrpc_post_dec(call) == -1)
+                       return -1;
+               ret = (call->blob_offset == call->blob_size) ? 0 : 1;
+       } else if (!manager->view.strides)
+               ret = py_dec_c_array(call, manager);
+       else if (!manager->view.suboffsets)
+               ret = py_dec_numpy_array(call, manager);
+       else
+               ret = py_dec_pil_array(call, manager);
+
+       if (ret <= 0) {
+               PyBuffer_Release(&manager->view);
+               free(call->decoder_manager);
+               call->decoder_manager = NULL;
        }
 
-       if (view.ndim == 0 || (view.ndim == 1 && !view.shape)) {
-               /* Simple scalar array */
-               call->bulk_item = view.buf;
-               call->bulk_count = view.len;
-               call->bulk_index = 0;
-               rxrpc_dec_bytes(call);
-               if (rxrpc_post_dec(call) < 0)
-                       goto error;
-               goto done;
+       if (ret == 0 && call->padding_size > 0) {
+               /* Soak up the padding to a 32-bit boundary */
+               call->blob = &rxgen_dec_padding_sink;
+               call->blob_size = 4 - (call->blob_size & 3);
+               call->blob_offset = 0;
+               return 1;
+       }
+
+       return ret;
+}
+
+/*
+ * Initialise buffer decode.
+ *
+ * Returns 0 if there's nothing to do, 1 if the decode is set up and -1 on error,
+ */
+int py_dec_init_buffer(struct rx_call *call, Py_buffer *view, bool padded)
+{
+       struct py_dec_manager *manager = NULL;
+       size_t size;
+       int i;
+
+       if (call->decoder_manager) {
+               manager = call->decoder_manager;
+               PyBuffer_Release(&manager->view);
+               free(manager);
+               manager = NULL;
+               call->decoder_manager = NULL;
+       }
+
+       debug("INIT_BUFFER: l=%zu isz=%zd nd=%d\n", view->len, view->itemsize, view->ndim);
+       call->need_size = view->len;
+       if (view->len == 0) {
+               PyBuffer_Release(view);
+               return 0;
+       }
+
+       call->padding_size = padded ? 4 - (view->len & 3) : 0;
+
+       size = sizeof(struct py_dec_manager);
+       size += view->ndim * sizeof(struct py_dec_index);
+       manager = malloc(size);
+       if (!manager) {
+               PyBuffer_Release(view);
+               PyErr_NoMemory();
+               return -1;
        }
 
-       if (!view.strides) {
+       memset(manager, 0, size);
+       memcpy(&manager->view, view, sizeof(*view));
+       manager->indices[-1].ptr = manager->view.buf;
+
+       call->blob_size = manager->view.itemsize;
+
+       if (manager->view.ndim == 0) {
+               call->blob_size = manager->view.len;
+       } else if (manager->view.ndim == 1 && !manager->view.shape) {
+               manager->indices[0].ptr = manager->indices[-1].ptr;
+               call->blob_size = manager->view.len;
+       } else if (!manager->view.strides) {
                /* Standard C array */
-               if (py_dec_c_array(call, view.buf, view.ndim, view.shape, view.itemsize) < 0)
-                       goto error;
-               goto done;
+               for (i = 0; i <= manager->view.ndim - 1; i++) {
+                       void **ptrs = manager->indices[i - 1].ptr;
+                       void *ptr = ptrs[manager->indices[i].val];
+                       manager->indices[i].ptr = ptr;
+               }
+       } else if (!manager->view.suboffsets) {
+               /* NumPy-style Python array */
+               for (i = 0; i <= manager->view.ndim - 1; i++) {
+                       void *ptr = manager->indices[i - 1].ptr;
+                       ptr += manager->indices[i].val * manager->view.strides[i];
+                       manager->indices[i].ptr = ptr;
+               }
+       } else {
+               /* PIL-style Python array */
+               for (i = 0; i <= manager->view.ndim - 1; i++) {
+                       void *ptr = manager->indices[i - 1].ptr;
+                       ptr += manager->indices[i].val * manager->view.strides[i];
+                       if (manager->view.suboffsets[i] >= 0) {
+                               ptr = *((void *const *)ptr);
+                               ptr += manager->view.suboffsets[i];
+                       }
+                       manager->indices[i].ptr = ptr;
+               }
        }
 
-       if (!view.suboffsets) {
-               if (py_dec_numpy_array(call, view.buf, view.ndim, view.shape, view.strides,
-                                      view.itemsize) < 0)
-                       goto error;
-               goto done;
+       /* Set the first blob to be decoded */
+       call->blob = manager->indices[manager->view.ndim - 1].ptr;
+       call->blob_offset = 0;
+       call->decoder_manager = manager;
+       return 1;
+}
+
+/*
+ * Initialise the decoding of the contents of an opaque type
+ */
+int py_dec_init_opaque(struct rx_call *call, PyObject *obj)
+{
+       Py_buffer view;
+       int ret;
+
+       if (call->error_code)
+               return -1;
+
+       if (PyObject_GetBuffer(obj, &view, PyBUF_FULL) < 0) {
+               debug("*** GET BUFFER FAILED\n");
+               return -1;
        }
 
-       /* PIL-style Python array */
-       if (py_dec_pil_array(call, view.buf, view.ndim, view.shape, view.strides,
-                            view.suboffsets, view.itemsize) < 0)
-               goto error;
+       ret = py_dec_init_buffer(call, &view, true);
+       if (ret == -1)
+               return -1;
 
-done:
-       PyBuffer_Release(&view);
-       return call->error_code ? -1 : 0;
+       return (ret < 0 || call->error_code) ? -1 : 0;
+}
 
-error:
-       PyBuffer_Release(&view);
-       return -1;
+/*
+ * Decode a string of bytes into a preallocated unicode string python object.
+ */
+int py_dec_into_string(struct rx_call *call)
+{
+       PyObject *str = call->blob;
+       unsigned needed, segment, i;
+
+       rxrpc_post_dec(call);
+
+       needed = call->blob_size - call->blob_offset;
+       segment = call->data_stop - call->data_cursor;
+       debug("DEC STR dc=%u bsize=%u seg=%u\n", call->data_count, needed, segment);
+
+       if (segment > 0) {
+               if (segment > needed)
+                       segment = needed;
+               if (call->blob != &rxgen_dec_padding_sink) {
+                       for (i = 0; i < segment; i++)
+                               PyUnicode_WRITE(PyUnicode_KIND(str), PyUnicode_DATA(str),
+                                               call->blob_offset + i, call->data_cursor[i]);
+               }
+               call->blob_decoded += segment;
+               call->blob_offset += segment;
+               call->data_cursor += segment;
+               if (call->blob_size <= call->blob_offset) {
+                       if (call->blob != &rxgen_dec_padding_sink && call->padding_size > 0) {
+                               /* Soak up the padding to a 32-bit boundary */
+                               call->blob = &rxgen_dec_padding_sink;
+                               call->blob_size = call->padding_size;
+                               call->blob_offset = 0;
+                               return 1;
+                       }
+                       return 0;
+               }
+       }
+
+       rxrpc_dec_advance_buffer(call);
+       return 1;
+}
+
+/*
+ * Decode a string of bytes into a preallocated unicode string python object.
+ */
+int py_dec_init_string(struct rx_call *call, PyObject **_str)
+{
+       PyObject *str;
+
+       debug("INIT STRING %u\n", call->blob_size);
+
+       str = PyUnicode_New(call->blob_size, 255);
+       if (!str)
+               return -1;
+       *_str = str;
+       if (call->blob_size == 0)
+               return 0;
+
+       if (PyUnicode_READY(str) < 0) {
+               debug("*** STRING NON-CANONICAL\n");
+               abort();
+       }
+
+       call->blob = str;
+       call->blob_offset = 0;
+       call->padding_size = 4 - (call->blob_size & 3);
+       return 1;
 }
 
+/*
+ * Comparator for binary searching the abort code table
+ */
 static int py_rxgen_received_abort_cmp(const void *key, const void *_entry)
 {
        const struct kafs_abort_list *entry = _entry;
@@ -1085,3 +1251,17 @@ PyObject *py_rxgen_received_abort(struct rx_call *call)
        else
                return PyErr_Format(ex, "Aborted %u", call->abort_code);
 }
+
+/*
+ * Clean up a call after decoding
+ */
+void py_rxgen_decoder_cleanup(struct rx_call *call)
+{
+       if (call->decoder_manager) {
+               struct py_dec_manager *manager = call->decoder_manager;
+               PyBuffer_Release(&manager->view);
+               free(manager);
+       }
+       Py_XDECREF(call->decoder_split_callback);
+       Py_XDECREF(call->decoder_split_info);
+}
index eb98fe5d1fc0787987378c7410399a8f81ce8a95..ffe8965c47a76ba93e21b7554cb41b33cf904caa 100644 (file)
@@ -12,6 +12,8 @@
 #ifndef _PY_RXGEN_H
 #define _PY_RXGEN_H
 
+#include "rxgen.h"
+
 struct py_rx_connection {
        PyObject_HEAD
        struct rx_connection *x;
@@ -39,10 +41,13 @@ enum py_rx_split_state {
        split_dead
 };
 
-struct py_rx_split_call {
+struct py_rx_split_info {
        PyObject_HEAD
        struct rx_call *call;
-       enum py_rx_split_state state;
+       PyObject *target;
+       enum py_rx_split_state split_state;
+       unsigned state;
+       bool receiving_data;
 };
 
 extern PyTypeObject py_rx_connectionType;
@@ -51,6 +56,7 @@ extern PyObject *kafs_py_rx_new_connection(PyObject *, PyObject *);
 extern PyObject *kafs_py_string_to_key(PyObject *, PyObject *);
 
 extern int py_rxgen_initialise_members(PyObject *obj, PyObject *kw);
+extern void py_rxgen_decoder_cleanup(struct rx_call *call);
 
 /*
  * Single embedded struct handling
@@ -67,10 +73,13 @@ extern int py_rxgen_premarshal_struct(void *p, size_t size, size_t offs,
  */
 extern PyObject *py_rxgen_get_string(const void *_p, size_t n);
 extern int py_rxgen_set_string(void *_p, size_t n, PyObject *val);
-extern int py_enc_buffer_raw(struct rx_call *call, Py_buffer *view, size_t dim, int align);
+extern int py_enc_buffer_raw(struct rx_call *call, Py_buffer *view, size_t dim);
 extern int py_enc_buffer(struct rx_call *call, Py_buffer *view, size_t dim);
-extern void py_dec_string(struct rx_call *call);
-extern int py_dec_opaque(struct rx_call *call, PyObject *obj);
+extern int py_dec_into_buffer(struct rx_call *call);
+extern int py_dec_init_buffer(struct rx_call *call, Py_buffer *view, bool padded);
+extern int py_dec_init_opaque(struct rx_call *call, PyObject *obj);
+extern int py_dec_into_string(struct rx_call *call);
+extern int py_dec_init_string(struct rx_call *call, PyObject **_str);
 
 /*
  * Embedded general array handling
@@ -118,15 +127,21 @@ extern PyObject *py_rxgen_received_abort(struct rx_call *call);
 /*
  * Split-mode RPC call handling
  */
-extern PyTypeObject py_rx_split_callType;
+extern PyTypeObject py_rx_split_infoType;
 extern PyObject *py_rxgen_split_client_prepare(void);
-extern int py_rxgen_split_client_transmit(PyObject *split, PyObject *split_call);
-extern int py_rxgen_split_client_receive(PyObject *split, PyObject *split_call);
+extern int py_rxgen_split_transmit(struct rx_call *call);
+extern int py_rxgen_split_receive(struct rx_call *call, bool init);
 
-static inline void py_rxgen_split_client_set(PyObject *_split_call, struct rx_call *call)
+static inline void py_rxgen_split_client_set(struct rx_call *call,
+                                            PyObject *split_callback,
+                                            PyObject *_split_info)
 {
-       struct py_rx_split_call *split_call = (struct py_rx_split_call *)_split_call;
-       split_call->call = call;
+       struct py_rx_split_info *split_info = (struct py_rx_split_info *)_split_info;
+
+       split_info->call = call;
+       Py_INCREF(split_callback);
+       call->decoder_split_callback = split_callback;
+       call->decoder_split_info = _split_info;
 }
 
 #endif /* _PY_RXGEN_H */
index fb6e17fae5dc2db65c080c973a52480259fb9ddd..ddfe85fd27794f62193f662f9e45f2ed62d76843 100644 (file)
 #include "afs_py.h"
 #include "rxgen.h"
 
+#define debug(fmt, ...) do { if (0) printf(fmt, ## __VA_ARGS__); } while (0)
+
+static int py_rx_split_do_recv(struct rx_call *call,
+                              struct py_rx_split_info *split_info)
+{
+       struct pollfd fds[1];
+
+       fds[0].fd = call->conn->fd;
+       fds[0].events = POLLIN;
+       fds[0].revents = 0;
+       poll(fds, 1, 0);
+       if (fds[0].revents & POLLIN)
+               rxrpc_recv_data(call->conn, true);
+       return 0;
+}
+
+static int py_rx_split_do_send_recv(struct rx_call *call,
+                                   struct py_rx_split_info *split_info,
+                                   bool more)
+{
+       if (!more) {
+               call->more_send = 0;
+               split_info->split_state = split_idle;
+       }
+       if (rxrpc_send_data(call) == -1)
+               return -1;
+
+       return py_rx_split_do_recv(call, split_info);
+}
+
 /*
- * Send an RPC call: split.send(data, more)
+ * Send an RPC call: split_info.send(data, more)
  */
 static PyObject *py_rx_split_send(PyObject *_self, PyObject *args)
 {
-       struct py_rx_split_call *self = (struct py_rx_split_call *)_self;
-       struct rx_call *call = self->call;
-       struct pollfd fds[1];
+       struct py_rx_split_info *split_info = (struct py_rx_split_info *)_self;
+       struct rx_call *call = split_info->call;
        Py_buffer data;
-       int more;
+       int more = 0;
 
        if (!call)
                abort();
 
-       if (self->state != split_transmitting)
+       if (split_info->split_state != split_transmitting)
                abort();
 
        if (!PyArg_ParseTuple(args, "y*p", &data, &more))
                return NULL;
 
-       if (py_enc_buffer_raw(call, &data, UINT_MAX, false) == -1)
+       if (py_enc_buffer_raw(call, &data, UINT_MAX) == -1)
                return PyErr_SetFromErrno(PyExc_OSError);
 
-       if (!more) {
-               call->more = 0;
-               self->state = split_idle;
-       }
-       if (rxrpc_send_data(call) == -1)
+       if (py_rx_split_do_send_recv(call, split_info, more) < 0)
                return PyErr_SetFromErrno(PyExc_OSError);
 
-       fds[0].fd = call->conn->fd;
-       fds[0].events = POLLIN;
-       fds[0].revents = 0;
-       poll(fds, 1, 0);
-       if (fds[0].revents & POLLIN)
-               rxrpc_recv_data(call->conn, true);
+       Py_RETURN_NONE;
+}
+
+/*
+ * Request reception of some data, where the amount of data is potentially
+ * zero.  This can only be used if there is no non-split data to be received
+ * after the split data as this commits the caller to handling all the
+ * remaining data in this phase.
+ *
+ * The caller should use split_info.data_available() to work out how much data
+ * is available, if any.
+ */
+static PyObject *py_rx_split_will_recv_all(PyObject *_self, PyObject *args)
+{
+       struct py_rx_split_info *split_info = (struct py_rx_split_info *)_self;
+       struct rx_call *call = split_info->call;
 
+       call->need_size = UINT_MAX;
        Py_RETURN_NONE;
 }
 
 /*
- * Recv an RPC call: v = split.recv(n)
+ * Receive a fixed-size object: ret = split_info.begin_recv(buffer)
+ *
+ * The object's intrinsic size is used to determine how much data we want for
+ * it.  Potentially 0-length, variable sized data cannot be handled this way.
+ *
+ * Returns NULL on error; True on success (ie. may be more to receive).
  */
-static PyObject *py_rx_split_recv(PyObject *_self, PyObject *args)
+static PyObject *py_rx_split_begin_recv(PyObject *_self, PyObject *args)
 {
-       struct py_rx_split_call *self = (struct py_rx_split_call *)_self;
-       struct rx_call *call = self->call;
+       struct py_rx_split_info *split_info = (struct py_rx_split_info *)_self;
+       struct rx_call *call = split_info->call;
+       Py_buffer buffer;
+       int ret, padded = 1;
 
-       if (self->state != split_receiving)
+       if (!call)
+               abort();
+       if (split_info->split_state != split_receiving)
                abort();
 
-       abort();
-       return NULL;
+       if (!PyArg_ParseTuple(args, "w*|p", &buffer, &padded))
+               return NULL;
+
+       ret = py_dec_init_buffer(call, &buffer, padded);
+       switch (ret) {
+       case -1: return NULL;
+       case  0: split_info->state++; Py_RETURN_TRUE;
+       case  1: split_info->receiving_data = true; Py_RETURN_TRUE;
+       default: abort();
+       }
+}
+
+/*
+ * Query how much data is in the Rx buffers of an RPC call: n = split_info.data_available()
+ */
+static PyObject *py_rx_split_data_available(PyObject *_self, PyObject *args)
+{
+       struct py_rx_split_info *split_info = (struct py_rx_split_info *)_self;
+       struct rx_call *call = split_info->call;
+
+       switch (call->state) {
+       case rx_call_cl_decoding_response:
+       case rx_call_sv_decoding_opcode:
+       case rx_call_sv_decoding_params:
+               break;
+       default:
+               /* Aborted or some other error */
+               return NULL;
+       }
+
+       debug("DAVAIL: %u %d\n", call->data_count, call->more_recv);
+
+       if (call->data_count > 0 || call->more_recv)
+               return PyLong_FromUnsignedLong(call->data_count);
+       else
+               Py_RETURN_NONE;
 }
 
 /*
  * Methods applicable to split-call objects
  */
-static PyMethodDef py_rx_split_methods[] = {
+static PyMethodDef py_rx_split_info_methods[] = {
        {"send", (PyCFunction)py_rx_split_send, METH_VARARGS,
         "" },
-       {"recv", (PyCFunction)py_rx_split_recv, METH_VARARGS,
+       {"will_recv_all", (PyCFunction)py_rx_split_will_recv_all, METH_NOARGS,
         "" },
+       {"begin_recv", (PyCFunction)py_rx_split_begin_recv, METH_VARARGS,
+        "" },
+       {"data_available", (PyCFunction)py_rx_split_data_available, METH_NOARGS,
+        "" },
+       {}
+};
+
+static PyMemberDef py_rx_split_info_members[] = {
+       { "state", T_UINT, offsetof(struct py_rx_split_info, state), 0, ""},
+       { "target",  T_OBJECT, offsetof(struct py_rx_split_info, target), 0, ""},
        {}
 };
 
@@ -90,24 +179,29 @@ static PyMethodDef py_rx_split_methods[] = {
 static int
 py_rx_split_init(PyObject *_self, PyObject *args, PyObject *kwds)
 {
-       struct py_rx_split_call *self = (struct py_rx_split_call *)_self;
+       struct py_rx_split_info *self = (struct py_rx_split_info *)_self;
        self->call = NULL;
+       self->target = NULL;
+       self->split_state = split_idle;
+       self->state = 0;
+       self->receiving_data = false;
        return 0;
 }
 
 static void
-py_rx_split_dealloc(struct py_rx_split_call *self)
+py_rx_split_dealloc(struct py_rx_split_info *self)
 {
-       assert(self->state != split_dead);
-       self->state = split_dead;
+       assert(self->split_state != split_dead);
+       self->split_state = split_dead;
        self->call = NULL;
+       Py_XDECREF(self->target);
        Py_TYPE(self)->tp_free((PyObject *)self);
 }
 
-PyTypeObject py_rx_split_callType = {
+PyTypeObject py_rx_split_infoType = {
        PyVarObject_HEAD_INIT(NULL, 0)
-       "kafs.rx_split_call",           /*tp_name*/
-       sizeof(struct py_rx_split_call), /*tp_basicsize*/
+       "kafs.rx_split_info",           /*tp_name*/
+       sizeof(struct py_rx_split_info), /*tp_basicsize*/
        0,                              /*tp_itemsize*/
        (destructor)py_rx_split_dealloc, /*tp_dealloc*/
        0,                              /*tp_print*/
@@ -132,8 +226,8 @@ PyTypeObject py_rx_split_callType = {
        0,                              /* tp_weaklistoffset */
        0,                              /* tp_iter */
        0,                              /* tp_iternext */
-       py_rx_split_methods,            /* tp_methods */
-       0,                              /* tp_members */
+       py_rx_split_info_methods,       /* tp_methods */
+       py_rx_split_info_members,       /* tp_members */
        0,                              /* tp_getset */
        0,                              /* tp_base */
        0,                              /* tp_dict */
@@ -146,13 +240,13 @@ PyTypeObject py_rx_split_callType = {
 };
 
 /*
- * Check a split-RPC object has transmit and receive functions.
+ * Allocate an information object.
  */
 PyObject *py_rxgen_split_client_prepare(void)
 {
-       struct py_rx_split_call *obj;
+       struct py_rx_split_info *obj;
 
-       obj = (struct py_rx_split_call *)_PyObject_New(&py_rx_split_callType);
+       obj = (struct py_rx_split_info *)_PyObject_New(&py_rx_split_infoType);
        if (!obj)
                return PyErr_NoMemory();
        py_rx_split_init((PyObject *)obj, NULL, NULL);
@@ -162,23 +256,27 @@ PyObject *py_rxgen_split_client_prepare(void)
 /*
  * Perform split-RPC transmission.
  */
-int py_rxgen_split_client_transmit(PyObject *split, PyObject *_split_call)
+int py_rxgen_split_transmit(struct rx_call *call)
 {
-       struct py_rx_split_call *split_call = (struct py_rx_split_call *)_split_call;
+       struct py_rx_split_info *split_info = call->decoder_split_info;
+       PyObject *callback = call->decoder_split_callback;
        PyObject *result;
        int ret;
 
-       assert(split_call->state == split_idle);
+       assert(split_info->split_state == split_idle);
 
-       split_call->state = split_transmitting;
-       result = PyObject_CallMethod(split, "client_transmit", "O", _split_call);
-       if (result) {
+       split_info->split_state = split_transmitting;
+       result = PyObject_CallMethod(callback, "transmit",
+                                    "O", call->decoder_split_info);
+       if (result == Py_None) {
+               ret = py_rx_split_do_send_recv(call, split_info, false);
+       } else if (result) {
                ret = PyObject_IsTrue(result) ? 0 : -1;
                Py_DECREF(result);
        } else {
                ret = -1;
        }
-       if (ret == 0 && split_call->state != split_idle)
+       if (ret == 0 && split_info->split_state != split_idle)
                abort();
        return ret;
 }
@@ -186,24 +284,62 @@ int py_rxgen_split_client_transmit(PyObject *split, PyObject *_split_call)
 /*
  * Perform split-RPC reception.
  */
-int py_rxgen_split_client_receive(PyObject *split, PyObject *_split_call)
+int py_rxgen_split_receive(struct rx_call *call, bool init)
 {
-       struct py_rx_split_call *split_call = (struct py_rx_split_call *)_split_call;
+       struct py_rx_split_info *split_info = call->decoder_split_info;
+       PyObject *callback = call->decoder_split_callback;
        PyObject *result;
        int ret;
 
-       assert(split_call->state == split_idle);
+       assert(split_info->split_state == split_idle);
 
-       split_call->state = split_receiving;
-       result = PyObject_CallMethod(split, "client_receive", "O", _split_call);
-       split_call->state = split_idle;
-       assert(result != NULL);
+       debug("-->%s(%u,%u,%u)\n", __func__, init, split_info->state, split_info->receiving_data);
 
-       if (result) {
-               ret = PyObject_IsTrue(result) ? 0 : -1;
-               Py_DECREF(result);
+       if (init)
+               split_info->state = 0;
+
+again:
+       if (split_info->receiving_data) {
+               switch (py_dec_into_buffer(call)) {
+               case -1:
+                       result = PyObject_CallMethod(callback, "receive_failed",
+                                                    "O", call->decoder_split_info);
+                       Py_XDECREF(result);
+                       return -1;
+               case  1: /* More data needed */
+                       return 1;
+               case  0:
+                       split_info->receiving_data = false;
+                       split_info->state++;
+                       break;
+               }
+       }
+
+       split_info->split_state = split_receiving;
+       result = PyObject_CallMethod(callback, "receive",
+                                    "O", call->decoder_split_info);
+       split_info->split_state = split_idle;
+
+       if (!result)
+               return -1;
+
+       if (result == Py_None) {
+               /* Split receive phase not used */
+               call->need_size = 0;
+               ret = 0;
+       } else if (result == Py_False) {
+               /* Split receive phase complete */
+               ret = 0;
+       } else if (result == Py_True) {
+               /* Wanting to receive data or reenter in a new state */
+               if (split_info->receiving_data)
+                       goto again;
+               ret = 1;
        } else {
+               PyErr_Format(PyExc_TypeError,
+                            "Expected True, False or None return from split receive function");
                ret = -1;
        }
+       Py_DECREF(result);
        return ret;
 }
diff --git a/rxgen.h b/rxgen.h
index 3ac09a2647dd72ddad0a2b500352e516d44cd4e8..eeb123d5d16768d98b2c5b86b776c6ddd00c4eac 100644 (file)
--- a/rxgen.h
+++ b/rxgen.h
@@ -15,6 +15,7 @@
 #include "af_rxrpc.h"
 #include <stdbool.h>
 #include <errno.h>
+#include <stdlib.h>
 
 typedef uint32_t net_xdr_t;
 
@@ -64,12 +65,13 @@ struct rx_call {
        enum rx_call_state state;
        unsigned        known_to_kernel : 1;
        unsigned        secured : 1;
-       unsigned        more : 1;
+       unsigned        more_send : 1;
+       unsigned        more_recv : 1;
        int             error_code;
        uint32_t        abort_code;
        unsigned        need_size;
 
-       unsigned long long bytes_sent, bytes_received;
+       unsigned long long bytes_sent, bytes_received, blob_decoded;
 
        /* Service routines */
        void (*processor)(struct rx_call *call);
@@ -83,11 +85,15 @@ struct rx_call {
        struct rx_buf   *buffer_tail;
        unsigned        data_count;
        unsigned        buffer_space;
+       unsigned        padding_size;
 
        /* Decoding support */
        unsigned        phase;          /* Encode/decode phase */
+       unsigned        blob_size;      /* Size of blob being encoded/decoded */
+       unsigned        blob_offset;    /* Offset into blob */
        unsigned        bulk_count;     /* Number of items in bulk array */
        unsigned        bulk_index;     /* Index of item being processed */
+       void            *blob;          /* Blob being encoded/decoded */
        union {
                void            *bulk_item;     /* Pointer to string/bytes/struct being processed */
                uint32_t        bulk_u32;       /* 8/16/32-bit integer being processed */
@@ -96,15 +102,21 @@ struct rx_call {
        };
        int (*decoder)(struct rx_call *call);
        void            *decoder_private;
+       void            *decoder_manager;
+       void            *decoder_split_callback;
+       void            *decoder_split_info;
+       void (*decoder_cleanup)(struct rx_call *call);
 };
 
+extern uint32_t rxgen_dec_padding_sink;
 
-extern int rxrpc_enc_bytes(struct rx_call *call, const void *data, size_t size);
+extern int rxrpc_enc_blob(struct rx_call *call, const void *data, size_t size);
 extern void rxrpc_enc_slow(struct rx_call *call, net_xdr_t data);
 static inline void rxrpc_enc(struct rx_call *call, uint32_t data)
 {
        uint32_t xdr_data = htonl(data);
-       if (__builtin_expect(call->data_cursor < call->data_stop, 1)) {
+       if (__builtin_expect(((unsigned long)call->data_cursor & 3) == 0 &&
+                            call->data_cursor < call->data_stop, 1)) {
                *(net_xdr_t *)call->data_cursor = xdr_data;
                call->data_cursor += 4;
        } else {
@@ -114,6 +126,7 @@ static inline void rxrpc_enc(struct rx_call *call, uint32_t data)
 
 static inline void rxrpc_enc_align(struct rx_call *call)
 {
+       abort(); // Can't assume data_cursor is 4-byte aligned
        while ((unsigned long)call->data_cursor & 3)
                *(call->data_cursor++) = 0;
 }
@@ -132,11 +145,12 @@ static inline int rxrpc_post_enc(struct rx_call *call)
        }
 }
 
-extern void rxrpc_dec_bytes(struct rx_call *call);
+extern void rxrpc_dec_blob(struct rx_call *call);
 extern uint32_t rxrpc_dec_slow(struct rx_call *call);
 static inline uint32_t rxrpc_dec(struct rx_call *call)
 {
-       if (__builtin_expect(call->data_cursor < call->data_stop, 1)) {
+       if (__builtin_expect(((unsigned long)call->data_cursor & 3) == 0 &&
+                            call->data_cursor < call->data_stop, 1)) {
                net_xdr_t x = *(net_xdr_t *)call->data_cursor;
                call->data_cursor += sizeof(x);
                return ntohl(x);
@@ -148,6 +162,7 @@ static inline uint32_t rxrpc_dec(struct rx_call *call)
 static inline void rxrpc_dec_align(struct rx_call *call)
 {
        unsigned long cursor = (unsigned long)call->data_cursor;
+       abort(); // Can't assume data_cursor is 4-byte aligned
        cursor = (cursor + 3) & ~3UL;
        call->data_cursor = (uint8_t*)cursor;
 }
@@ -158,6 +173,7 @@ static inline int rxrpc_post_dec(struct rx_call *call)
                size_t n = call->data_cursor - call->data_start;
                call->data_start = call->data_cursor;
                call->data_count -= n;
+               call->need_size -= n;
                return 0;
        } else {
                errno = call->error_code;
@@ -165,6 +181,7 @@ static inline int rxrpc_post_dec(struct rx_call *call)
        }
 }
 
+extern void rxrpc_dec_advance_buffer(struct rx_call *call);
 extern int rxgen_dec_discard_excess(struct rx_call *call);
 
 extern struct rx_connection *rx_new_connection(const struct sockaddr *sa,
index bf6deacef60ac0f9e87f3b6a261a0e0ed99c8d39..493eeacb474b971a445eff1e8b50306f970481dc 100644 (file)
@@ -34,10 +34,7 @@ sub emit_func_prototype($)
 
        if ($p->{class} eq "array") {
            die $p->{where}, ": Array arg not supported";
-       } elsif ($p->{class} eq "bulk" &&
-           !($p->{elem}->{class} eq "string" ||
-             $p->{elem}->{class} eq "opaque")
-           ) {
+       } elsif ($p->{class} eq "bulk") {
            # Encode
            if ($p->{elem}->{class} eq "struct") {
                $proto  = "int (*get__" . $p->{name} . ")(struct rx_call *call, void *token)";
@@ -63,8 +60,7 @@ sub emit_func_prototype($)
            push @declines, "void *token__" . $p->{name};
            push @declines, "size_t nr__" . $p->{name};
            push @args, "token__" . $p->{name};
-       } elsif ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" ||
-                                          $p->{elem}->{class} eq "opaque")) {
+       } elsif ($p->{class} eq "blob") {
            $proto = $p->{type} . " " . $p->{ptr} . $p->{name};
            push @enclines, "size_t nr__" . $p->{name};
            push @enclines, "const " . $proto;
@@ -191,12 +187,38 @@ sub emit_func_encode($$$$)
            ) {
            $phase->{size} +=  $p->{xdr_size};
            push @{$phase->{params}}, $p;
+       } elsif ($p->{class} eq "blob") {
+           $have_bulk = 2;
+
+           # Bulk objects begin with an element count
+           $phase->{elem_count} = $phase->{size};
+           $phase->{size} +=  4;
+
+           my %pseudoparam = (
+               class   => "basic",
+               type    => "blob_size",
+               name    => $p->{name},
+               elem    => $p->{elem},
+               where   => $p->{where},
+               xdr_size => $p->{xdr_size},
+           );
+           push @{$phase->{params}}, \%pseudoparam;
+
+           # Create a new phase
+           $phase = {
+               type    => "blob",
+               name    => $p->{name},
+               elem    => $p->{elem},
+               params  => [ $p ],
+               xdr_size => $p->{xdr_size},
+               size    => $p->{xdr_size},
+           };
+           push @phases, $phase;
+
+           $phase->{size} = 4;
+           $phase = 0;
        } elsif ($p->{class} eq "bulk") {
            $have_bulk = 1 if ($have_bulk == 0);
-           if ($p->{elem}->{class} eq "string" ||
-               $p->{elem}->{class} eq "opaque") {
-               $have_bulk = 2;
-           }
 
            # Bulk objects begin with an element count
            $phase->{elem_count} = $phase->{size};
@@ -223,16 +245,11 @@ sub emit_func_encode($$$$)
            };
            push @phases, $phase;
 
-           if ($p->{elem}->{class} eq "string" ||
-               $p->{elem}->{class} eq "opaque") {
-               $phase->{size} = 4;
-           } else {
-               # We don't want to be sending one object at a time if they're
-               # really small.
-               my $n_buf = ($p->{xdr_size} < 1020) ? int(1020 / $p->{xdr_size}) : 1;
-               $n_buf *= $p->{xdr_size};
-               $phase->{size} = $p->{xdr_size};
-           }
+           # We don't want to be sending one object at a time if they're
+           # really small.
+           my $n_buf = ($p->{xdr_size} < 1020) ? int(1020 / $p->{xdr_size}) : 1;
+           $n_buf *= $p->{xdr_size};
+           $phase->{size} = $p->{xdr_size};
            $phase = 0;
        } else {
            die $p->{where}, "Encode array arg not supported";
@@ -248,7 +265,7 @@ sub emit_func_encode($$$$)
 
     unless (@params) {
        die if ($side eq "client");
-       print RXOUT "\tcall->more = 0;\n";
+       print RXOUT "\tcall->more_send = 0;\n";
        print RXOUT "\treturn 0;\n";
        print RXOUT "}\n";
        return;
@@ -263,7 +280,7 @@ sub emit_func_encode($$$$)
     print RXOUT "\tswitch (phase) {\n";
 
     print RXOUT "\tcase 0:\n";
-    print RXOUT "\t\tcall->more = 1;\n";
+    print RXOUT "\t\tcall->more_send = 1;\n";
 
     my $phase_goto_label = 0;
     my $phix;
@@ -277,7 +294,14 @@ sub emit_func_encode($$$$)
        }
 
        # Determine how big bulk objects are
-       if ($phase->{type} eq "bulk") {
+       if ($phase->{type} eq "blob") {
+           my $p = $phase->{params}->[0];
+           print RXOUT "\t\tcall->blob_size = ", $ptr, "nr__", $p->{name}, ";\n";
+           print RXOUT "\t\tif (call->blob_size == 0)\n";
+           print RXOUT "\t\t\tgoto phase_", $phix + 1, ";\n";
+           $phase_goto_label = $phix + 1;
+           print RXOUT "\t\tcall->blob_offset = 0;\n";
+       } elsif ($phase->{type} eq "bulk") {
            my $p = $phase->{params}->[0];
            print RXOUT "\t\tcall->bulk_count = ", $ptr, "nr__", $p->{name}, ";\n";
            print RXOUT "\t\tif (call->bulk_count == 0)\n";
@@ -292,7 +316,11 @@ sub emit_func_encode($$$$)
 
        # Marshal the data
        foreach my $p (@{$phase->{params}}) {
-           if ($p->{type} eq "bulk_size") {
+           if ($p->{type} eq "blob_size") {
+               print RXOUT "\t\trxrpc_enc(call, ", $ptr, "nr__", $p->{name}, ");\n";
+               $close_phase = 0;
+               next;
+           } elsif ($p->{type} eq "bulk_size") {
                print RXOUT "\t\trxrpc_enc(call, ", $ptr, "nr__", $p->{name}, ");\n";
                $close_phase = 0;
                next;
@@ -337,7 +365,12 @@ sub emit_func_encode($$$$)
                die $p->{where}, ": Unsupported type in decode";
            }
 
-           if ($p->{class} eq "bulk") {
+           if ($p->{class} eq "blob") {
+               print RXOUT "\t\tif (call->blob_offset < call->blob_size) {\n";
+               print RXOUT "\t\t\tphase = ", $phix, ";\n";
+               print RXOUT "\t\t\tgoto select_phase;\n";
+               print RXOUT "\t\t}\n";
+           } elsif ($p->{class} eq "bulk") {
                print RXOUT "\t\tif (call->bulk_index < call->bulk_count) {\n";
                print RXOUT "\t\t\tphase = ", $phix, ";\n";
                print RXOUT "\t\t\tgoto select_phase;\n";
@@ -356,7 +389,7 @@ sub emit_func_encode($$$$)
     print RXOUT "\tcase ", $phix, ":\n";
     print RXOUT "\t\tif (rxrpc_post_enc(call) < 0)\n";
     print RXOUT "\t\t\treturn -1;\n";
-    print RXOUT "\t\tcall->more = 0;\n";
+    print RXOUT "\t\tcall->more_send = 0;\n";
     print RXOUT "\t\tbreak;\n";
     print RXOUT "\t}\n";
 
@@ -397,6 +430,33 @@ sub emit_func_decode($$$$)
            ) {
            $phase->{size} +=  $p->{xdr_size};
            push @{$phase->{params}}, $p;
+       } elsif ($p->{class} eq "blob") {
+           $have_bulk = 1;
+
+           # Bulk objects begin with an element count
+           $phase->{elem_count} = $phase->{size};
+           $phase->{size} +=  4;
+
+           my %pseudoparam = (
+               class   => "basic",
+               type    => "blob_size",
+               name    => $p->{name},
+               elem    => $p->{elem},
+               where   => $p->{where},
+               xdr_size => $p->{xdr_size},
+               );
+           push @{$phase->{params}}, \%pseudoparam;
+
+           # Create a new phase
+           $phase = {
+               type => "blob",
+               name => $p->{name},
+               params => [ $p ],
+               size => 4,
+               xdr_size => $p->{xdr_size},
+           };
+           push @phases, $phase;
+           $phase = 0;
        } elsif ($p->{class} eq "bulk") {
            $have_bulk = 1;
 
@@ -423,11 +483,6 @@ sub emit_func_decode($$$$)
            };
            push @phases, $phase;
 
-           if ($p->{elem}->{class} eq "string" ||
-               $p->{elem}->{class} eq "opaque") {
-               $phase->{bytearray} = 1;
-           }
-
            # We don't want to be asking recvmsg() for one object at a time if
            # they're really small.
            my $n_buf = ($p->{xdr_size} < 1020) ? int(1020 / $p->{xdr_size}) : 1;
@@ -475,16 +530,23 @@ sub emit_func_decode($$$$)
        }
 
        # Determine how big bulk objects are
-       if ($phase->{type} eq "bulk") {
+       if ($phase->{type} eq "blob") {
+           my $p = $phase->{params}->[0];
+           print RXOUT "\t\tcall->blob_size = ", $ptr, "nr__", $p->{name}, ";\n";
+           print RXOUT "\t\tcall->blob_offset = UINT_MAX;\n";
+           print RXOUT "\t\tif (", $ptr, "alloc__", $p->{name}, "(call, &", $ptr, "token__", $p->{name}, ") < 0)\n";
+           print RXOUT "\t\t\treturn -1;\n";
+           print RXOUT "\t\tif (call->blob_size == 0)\n";
+           print RXOUT "\t\t\tgoto phase_", $phix + 1, ";\n";
+           $phase_goto_label = $phix + 1;
+           print RXOUT "\t\tcall->blob_offset = 0;\n";
+       } elsif ($phase->{type} eq "bulk") {
            my $p = $phase->{params}->[0];
            print RXOUT "\t\tcall->bulk_count = ", $ptr, "nr__", $p->{name}, ";\n";
            print RXOUT "\t\tcall->bulk_index = UINT_MAX;\n";
 
            if ($p->{elem}->{class} eq "basic") {
                print RXOUT "\t\tif (", $ptr, "store__", $p->{name}, "(call, &", $ptr, "token__", $p->{name}, ") < 0)\n";
-           } elsif ($p->{elem}->{class} eq "string" ||
-                    $p->{elem}->{class} eq "opaque") {
-               print RXOUT "\t\tif (", $ptr, "alloc__", $p->{name}, "(call, &", $ptr, "token__", $p->{name}, ") < 0)\n";
            } else {
                print RXOUT "\t\tif (", $ptr, "alloc__", $p->{name}, "(call, &", $ptr, "token__", $p->{name}, ") < 0)\n";
            }
@@ -502,8 +564,9 @@ sub emit_func_decode($$$$)
        print RXOUT "\tcase ", $phix, ":\n";
 
        print RXOUT "\t\tif (count < ", $phase->{size}, ")";
-       if ($phase->{type} eq "bulk" && !exists($phase->{bytearray}) &&
-           $phase->{xdr_size} <= 512) {
+       if ($phase->{type} eq "bulk" &&
+           $phase->{xdr_size} <= 512
+           ) {
            print RXOUT " {\n";
            print RXOUT "\t\t\tunsigned n = call->bulk_count - call->bulk_index;\n";
            print RXOUT "\t\t\tn = MIN(n, ", int(1024 / $phase->{xdr_size}), ");\n";
@@ -518,7 +581,10 @@ sub emit_func_decode($$$$)
        # Unmarshal the data
        print RXOUT "\n";
        foreach my $p (@{$phase->{params}}) {
-           if ($p->{type} eq "bulk_size") {
+           if ($p->{type} eq "blob_size") {
+               print RXOUT "\t\t", $ptr, "nr__", $p->{name}, " = rxrpc_dec(call);\n";
+               next;
+           } elsif ($p->{type} eq "bulk_size") {
                print RXOUT "\t\t", $ptr, "nr__", $p->{name}, " = rxrpc_dec(call);\n";
                next;
            }
@@ -542,9 +608,8 @@ sub emit_func_decode($$$$)
                print RXOUT "\t\t\treturn -1;\n";
                print RXOUT "\t\trxgen_decode_", $p->{type}, "(call, call->bulk_item);\n";
                print RXOUT "\t\tcall->bulk_index++;\n";
-           } elsif ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" ||
-                                              $p->{elem}->{class} eq "opaque")) {
-               print RXOUT "\t\trxrpc_dec_bytes(call);\n";
+           } elsif ($p->{class} eq "blob") {
+               print RXOUT "\t\trxrpc_dec_blob(call);\n";
                print RXOUT "\t\trxrpc_dec_align(call);\n";
            } elsif ($p->{class} eq "basic" && $p->{xdr_size} == 4) {
                print RXOUT "\t\t", $ptr, $p->{name}, " = rxrpc_dec(call);\n";
@@ -557,7 +622,14 @@ sub emit_func_decode($$$$)
                die $p->{where}, ": Unsupported type in decode";
            }
 
-           if ($p->{class} eq "bulk") {
+           if ($p->{class} eq "blob") {
+               print RXOUT "\t\tif (rxrpc_post_dec(call) < 0)\n";
+               print RXOUT "\t\t\treturn -1;\n";
+               print RXOUT "\t\tif (call->blob_offset < call->blob_size) {\n";
+               print RXOUT "\t\t\tphase = ", $phix, ";\n";
+               print RXOUT "\t\t\tgoto select_phase;\n";
+               print RXOUT "\t\t}\n";
+           } elsif ($p->{class} eq "bulk") {
                print RXOUT "\t\tif (rxrpc_post_dec(call) < 0)\n";
                print RXOUT "\t\t\treturn -1;\n";
                print RXOUT "\t\tif (call->bulk_index < call->bulk_count) {\n";
@@ -567,7 +639,7 @@ sub emit_func_decode($$$$)
            }
        }
 
-       if ($phase->{type} ne "bulk") {
+       if ($phase->{type} ne "blob" || $phase->{type} ne "bulk") {
            print RXOUT "\t\tif (rxrpc_post_dec(call) < 0)\n";
            print RXOUT "\t\t\treturn -1;\n";
        }
@@ -629,31 +701,35 @@ sub emit_func_send($$)
 
     print RXOUT "\tstruct rx_call *call;\n" if ($what eq "request");
 
-    my @all_bulk_params = grep { $_->{class} eq "bulk"; } @{$params};
-
-    my @bulk_params = grep { ($_->{elem}->{class} ne "string" &&
-                             $_->{elem}->{class} ne "opaque"); } @all_bulk_params;
+    my @blob_params = grep { $_->{class} eq "blob"; } @{$params};
+    my @bulk_params = grep { $_->{class} eq "bulk"; } @{$params};
 
     # Local variables
     print RXOUT "\tint ret;\n";
 
     # Check lengths
-    if (@all_bulk_params) {
+    if (@blob_params || @bulk_params) {
        print RXOUT "\n";
        print RXOUT "\tif (";
        my $first = 1;
-       foreach my $p (@all_bulk_params) {
+       foreach my $p (@blob_params) {
            if ($first) {
                $first = 0;
            } else {
                print RXOUT " ||\n\t    ";
            }
-           if ($p->{elem}->{class} eq "string" ||
-               $p->{elem}->{class} eq "opaque") {
-               print RXOUT "!", $p->{name};
+           print RXOUT "!", $p->{name};
+           if (exists($p->{dim})) {
+               print RXOUT " || nr__", $p->{name}, " > ", $p->{dim};
+           }
+       }
+       foreach my $p (@bulk_params) {
+           if ($first) {
+               $first = 0;
            } else {
-               print RXOUT "!get__", $p->{name};
+               print RXOUT " ||\n\t    ";
            }
+           print RXOUT "!get__", $p->{name};
            if (exists($p->{dim})) {
                print RXOUT " || nr__", $p->{name}, " > ", $p->{dim};
            }
@@ -685,10 +761,23 @@ sub emit_func_send($$)
            print RXOUT "\trxrpc_enc(call, (uint32_t)(", $p->{name}, " >> 32));\n";
        } elsif ($p->{class} eq "struct") {
            print RXOUT "\trxgen_encode_", $p->{type}, "(call, ", $p->{name}, ");\n";
-       } elsif ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" ||
-                                          $p->{elem}->{class} eq "opaque")) {
-           print RXOUT "\trxrpc_enc_bytes(call, ", $p->{name}, ", nr__", $p->{name}, ");\n";
+       } elsif ($p->{class} eq "blob") {
+           print RXOUT "\trxrpc_enc_blob(call, ", $p->{name}, ", nr__", $p->{name}, ");\n";
            print RXOUT "\trxrpc_enc_align(call);\n";
+       } elsif ($p->{class} eq "blob") {
+           print RXOUT "\trxrpc_enc(call, nr__", $p->{name}, ");\n";
+           print RXOUT "\tcall->blob_size = nr__", $p->{name}, ";\n";
+           print RXOUT "\tfor (call->blob_offset = 0; call->blob_offset < call->blob_size; call->blob_offset++) {\n";
+           if ($p->{elem}->{class} eq "struct") {
+               print RXOUT "\t\tstruct ", $p->{elem}->{type}, " x;\n";
+           } else {
+               print RXOUT "\t\t", $p->{elem}->{type}, " x;\n";
+           }
+           print RXOUT "\t\tcall->blob = &x;\n";
+           print RXOUT "\t\tif (get__", $p->{name}, "(call, token__", $p->{name}, ") < 0)\n";
+           print RXOUT "\t\t\tgoto error;\n";
+           die $p->{where}, "No decoding for array type '$type'";
+           print RXOUT "\t}\n";
        } elsif ($p->{class} eq "bulk") {
            print RXOUT "\trxrpc_enc(call, nr__", $p->{name}, ");\n";
            print RXOUT "\tcall->bulk_count = nr__", $p->{name}, ";\n";
@@ -723,7 +812,7 @@ sub emit_func_send($$)
 
     print RXOUT "\tif (rxrpc_post_enc(call) < 0)\n";
     print RXOUT "\t\tgoto error;\n";
-    print RXOUT "\tcall->more = 0;\n";
+    print RXOUT "\tcall->more_send = 0;\n";
 
     # Send the message
     print RXOUT "\n";
index 1760eb03a0599f34ac4d9b99e93426b533f891c1..6511a1e3201a0a2e965d24ecc86ff1cfbbb762f9 100644 (file)
@@ -85,7 +85,7 @@ sub emit_py_module() {
     if (@py_type_defs) {
        print PYOUT "\tif (";
        print PYOUT "PyType_Ready(&py_rx_connectionType) < 0 ||\n\t    ";
-       print PYOUT "PyType_Ready(&py_rx_split_callType) < 0";
+       print PYOUT "PyType_Ready(&py_rx_split_infoType) < 0";
        my $first = 0;
        foreach my $def (@py_type_defs) {
            print PYOUT " ||\n\t    " unless ($first);
index 821bf243f5316c07945584b05a3180578a283cdb..dd3c609d6ace7c7ca8f381d61c04f7ca84ee6109 100644 (file)
@@ -54,7 +54,7 @@ sub emit_py_func_param_object($$) {
                push @complex, $p;
                print PYHDR "\t\tPyObject\t*", $p->{name}, ";\n";
            }
-           $have_opaque = 1 if ($p->{class} eq "bulk" && $p->{elem}->{class} eq "opaque");
+           $have_opaque = 1 if ($p->{class} eq "blob" && $p->{elem}->{class} eq "opaque");
        }
        print PYHDR "\t} x;\n";
        print PYHDR "\tPy_buffer dec_buf;\n" if ($have_opaque);
@@ -96,7 +96,8 @@ sub emit_py_func_param_object($$) {
        print PYOUT "static PyMemberDef ", $struct_req, "_members[] = {\n";
        foreach my $p (@{$params}) {
            print PYOUT "\t{ \"", $p->{name}, "\", ";
-           if ($p->{class} eq "bulk") {                print PYOUT "T_OBJECT_EX";
+           if ($p->{class} eq "blob") {                print PYOUT "T_OBJECT_EX";
+           } elsif ($p->{class} eq "bulk") {           print PYOUT "T_OBJECT_EX";
            } elsif ($p->{type} eq "char"    ) {        print PYOUT "T_CHAR";
            } elsif ($p->{type} eq "int8_t"  ) {        print PYOUT "T_BYTE";
            } elsif ($p->{type} eq "int16_t" ) {        print PYOUT "T_SHORT";
@@ -174,7 +175,6 @@ sub emit_py_func_bulk_helper($)
 
     foreach my $p (@{$func->{params}}) {
        next if ($p->{class} ne "bulk");
-       next if ($p->{elem}->{class} eq "string" || $p->{elem}->{class} eq "opaque");
 
        # Data encoding
        if (!exists $bulk_get_helpers{$p->{type}}) {
@@ -255,8 +255,7 @@ sub emit_py_func_simple_sync_call($)
     print PYOUT "\tstruct py_rx_connection *z_conn;\n";
     print PYOUT "\tstruct py_", $func->{name}, "_response *response;\n";
     foreach my $p (@{$func->{request}}) {
-       if ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" ||
-                                     $p->{elem}->{class} eq "opaque")) {
+       if ($p->{class} eq "blob") {
            print PYOUT "\tPy_buffer param_", $p->{name}, ";\n";
        } elsif ($p->{class} eq "basic") {
            print PYOUT "\t", $p->{type}, " param_", $p->{name}, ";\n";
@@ -272,7 +271,7 @@ sub emit_py_func_simple_sync_call($)
        }
     }
 
-    print PYOUT "\tPyObject *split, *split_call;\n" if ($func->{split});
+    print PYOUT "\tPyObject *split_callback, *split_info;\n" if ($func->{split});
     print PYOUT "\tPyObject *res = NULL;\n";
     print PYOUT "\tint ret;\n";
 
@@ -292,12 +291,11 @@ sub emit_py_func_simple_sync_call($)
        } elsif ($p->{type} eq "uint32_t") {    print PYOUT "I";
        } elsif ($p->{type} eq "uint64_t") {    print PYOUT "K";
        } elsif ($p->{class} eq "struct") {     print PYOUT "O!";
-       } elsif ($p->{class} eq "bulk" && $p->{elem}->{class} eq "string") {
+       } elsif ($p->{class} eq "blob" && $p->{elem}->{class} eq "string") {
            print PYOUT "s*";
-       } elsif ($p->{class} eq "bulk" && $p->{elem}->{class} eq "opaque") {
+       } elsif ($p->{class} eq "blob" && $p->{elem}->{class} eq "opaque") {
            print PYOUT "z*";
-       } elsif ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "basic" ||
-                                          $p->{elem}->{class} eq "struct")) {
+       } elsif ($p->{class} eq "bulk") {
            print PYOUT "O!";
        } else {
            die $p->{where}, ": No py parse for param";
@@ -311,55 +309,53 @@ sub emit_py_func_simple_sync_call($)
     foreach my $p (@{$func->{request}}) {
        print PYOUT ",\n";
        print PYOUT "\t\t\t      ";
-       if ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" ||
-                                     $p->{elem}->{class} eq "opaque")) {
-           print PYOUT "&param_", $p->{name};
-       } elsif ($p->{class} eq "basic") {
+       if ($p->{class} eq "basic") {
            print PYOUT "&param_", $p->{name};
        } elsif ($p->{class} eq "struct") {
            print PYOUT "&py_", $p->{type}, "Type, &param_", $p->{name};
+       } elsif ($p->{class} eq "blob") {
+           print PYOUT "&param_", $p->{name};
        } elsif ($p->{class} eq "bulk") {
            print PYOUT "&PyList_Type, &param_", $p->{name};
        } else {
            die $p->{where}, ": Unsupported type \"", $p->{type}, "\"";
        }
     }
-    print PYOUT ",\n\t\t\t      &split" if ($func->{split});
+    print PYOUT ",\n\t\t\t      &split_callback" if ($func->{split});
     print PYOUT "))\n";
     print PYOUT "\t\treturn NULL;\n";
 
     if ($func->{split}) {
        print PYOUT "\n";
-       print PYOUT "\tsplit_call = py_rxgen_split_client_prepare();\n";
-       print PYOUT "\tif (!split_call)\n";
+       print PYOUT "\tsplit_info = py_rxgen_split_client_prepare();\n";
+       print PYOUT "\tif (!split_info)\n";
        print PYOUT "\t\treturn NULL;\n";
     }
 
     print PYOUT "\n";
     print PYOUT "\tcall = rxrpc_alloc_call(z_conn->x, 0);\n";
     print PYOUT "\tif (!call) {\n";
-    print PYOUT "\t\tPy_XDECREF(split_call);\n" if ($func->{split});
+    print PYOUT "\t\tPy_XDECREF(split_info);\n" if ($func->{split});
     print PYOUT "\t\treturn PyErr_NoMemory();\n";
     print PYOUT "\t}\n";
-    print PYOUT "\tpy_rxgen_split_client_set(split_call, call);\n" if ($func->{split});
+    print PYOUT "\tcall->decoder_cleanup = py_rxgen_decoder_cleanup;\n";
+    print PYOUT "\tpy_rxgen_split_client_set(call, split_callback, split_info);\n"
+       if ($func->{split});
 
     # Marshal the arguments
     print PYOUT "\n";
     print PYOUT "\trxrpc_enc(call, ", $func->{opcode}, ");\n";
     foreach my $p (@{$func->{request}}) {
-       if ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "basic" ||
-                                     $p->{elem}->{class} eq "struct")) {
-           print PYOUT "\tif (py_encode_bulk_", $p->{type}, "(call, param_", $p->{name}, ") < 0)\n";
-           print PYOUT "\t\tgoto error;\n";
-       } elsif ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" ||
-                                          $p->{elem}->{class} eq "opaque")) {
+       if ($p->{class} eq "blob") {
            my $dim = -1;
            $dim = $p->{dim} if exists $p->{dim};
            print PYOUT "\tif (py_enc_buffer(call, &param_", $p->{name}, ", ", $dim, ") < 0) {\n";
            print PYOUT "\t\trxrpc_terminate_call(call, EINVAL);\n";
-           print PYOUT "\t\tPy_XDECREF(split_call);\n" if ($func->{split});
            print PYOUT "\t\treturn NULL;\n";
            print PYOUT "\t}\n";
+       } elsif ($p->{class} eq "bulk") {
+           print PYOUT "\tif (py_encode_bulk_", $p->{type}, "(call, param_", $p->{name}, ") < 0)\n";
+           print PYOUT "\t\tgoto error;\n";
        } elsif ($p->{class} eq "basic" && $p->{xdr_size} == 4) {
            print PYOUT "\trxrpc_enc(call, param_", $p->{name}, ");\n";
        } elsif ($p->{class} eq "basic" && $p->{xdr_size} == 8) {
@@ -368,7 +364,6 @@ sub emit_py_func_simple_sync_call($)
        } elsif ($p->{class} eq "struct") {
            print PYOUT "\tif (py_premarshal_", $p->{type}, "((PyObject *)param_", $p->{name}, ")) {\n";
            print PYOUT "\t\trxrpc_terminate_call(call, EINVAL);\n";
-           print PYOUT "\t\tPy_XDECREF(split_call);\n" if ($func->{split});
            print PYOUT "\t\treturn NULL;\n";
            print PYOUT "\t}\n";
            print PYOUT "\trxgen_encode_", $p->{type}, "(call, &param_", $p->{name}, "->x);\n";
@@ -393,10 +388,10 @@ sub emit_py_func_simple_sync_call($)
 
     # Transmit the split data
     if ($func->{split}) {
-       print PYOUT "\tif (py_rxgen_split_client_transmit(split, split_call) < 0)\n";
+       print PYOUT "\tif (py_rxgen_split_transmit(call) < 0)\n";
        print PYOUT "\t\tgoto error_no_res;\n";
     } else {
-       print PYOUT "\tcall->more = 0;\n";
+       print PYOUT "\tcall->more_send = 0;\n";
 
        # Make the call
        print PYOUT "\n";
@@ -418,7 +413,6 @@ sub emit_py_func_simple_sync_call($)
     # Successful return
     print PYOUT "\n";
     print PYOUT "\trxrpc_terminate_call(call, 0);\n";
-    print PYOUT "\tPy_XDECREF(split_call);\n" if ($func->{split});
     print PYOUT "\treturn res;\n";
 
     # Error cleanups
@@ -460,11 +454,27 @@ sub emit_py_func_decode($$$$)
     # in the previous phase.  Variable-sized bulk arrays are split across
     # multiple phases, with the length being at the end of the first phase and
     # the data in the second.
+    #
+    # We also need to interpolate a phase to deal with decoding split-op
+    # auxiliary data.  This comes last when decoding the request and first when
+    # decoding the response.
+    #
     my @phases = ();
     my $phase = 0;
     my $have_bulk = 0;
     my $want_item = 0;
 
+    if ($func->{split} && $subname eq "response") {
+       $phase = {
+           type => "split",
+           size => "py_rxgen_split_receive(call)",
+           params => []
+       };
+       push @phases, $phase;
+       $phase = 0;
+       $have_bulk = 1;
+    }
+
     foreach my $p (@params) {
        unless ($phase) {
            $phase = { type => "flat", size => 0, params => [] };
@@ -476,6 +486,36 @@ sub emit_py_func_decode($$$$)
            ) {
            $phase->{size} +=  $p->{xdr_size};
            push @{$phase->{params}}, $p;
+       } elsif ($p->{class} eq "blob") {
+           $have_bulk = 1;
+
+           # Bulk objects begin with an element count
+           $phase->{elem_count} = $phase->{size};
+           $phase->{size} +=  4;
+
+           my %pseudoparam = (
+               class   => "basic",
+               type    => "blob_size",
+               name    => $p->{name},
+               elem    => $p->{elem},
+               where   => $p->{where},
+               xdr_size => $p->{xdr_size},
+               );
+           push @{$phase->{params}}, \%pseudoparam;
+
+           # Create a new phase
+           $phase = {
+               type => "blob",
+               name => $p->{name},
+               params => [ $p ],
+               xdr_size => $p->{xdr_size},
+           };
+           push @phases, $phase;
+
+           # We don't want to be asking recvmsg() for one object at a time if
+           # they're really small.
+           $phase->{size} = $p->{xdr_size};
+           $phase = 0;
        } elsif ($p->{class} eq "bulk") {
            $have_bulk = 1;
 
@@ -502,12 +542,7 @@ sub emit_py_func_decode($$$$)
            };
            push @phases, $phase;
 
-           if ($p->{elem}->{class} eq "string" ||
-               $p->{elem}->{class} eq "opaque") {
-               ;
-           } else {
-               $want_item = 1;
-           }
+           $want_item = 1;
 
            # We don't want to be asking recvmsg() for one object at a time if
            # they're really small.
@@ -518,19 +553,31 @@ sub emit_py_func_decode($$$$)
        }
     }
 
+    if ($func->{split} && $subname eq "request") {
+       $phase = {
+           type => "split",
+           size => "py_rxgen_split_receive(call)",
+           params => []
+       };
+       push @phases, $phase;
+       $phase = 0;
+       $have_bulk = 1;
+    }
+
     # Function definition and arguments
     print PYOUT "\n";
     print PYOUT "int py_", $func->{name}, "_decode_", $subname, "(struct rx_call *call)\n";
     print PYOUT "{\n";
 
-    unless (@params) {
+    unless (@params || $func->{split}) {
        print PYOUT "\treturn 0;\n";
        print PYOUT "}\n";
        return;
     }
 
     # Local variables
-    print PYOUT "\tstruct py_", $func->{name}, "_", $subname, " *obj = call->decoder_private;\n";
+    print PYOUT "\tstruct py_", $func->{name}, "_", $subname, " *obj = call->decoder_private;\n"
+       if (@params);
     print PYOUT "\tPyObject *item;\n" if ($want_item);
     print PYOUT "\tunsigned phase = call->phase;\n";
     print PYOUT "\tunsigned count;\n";
@@ -539,6 +586,7 @@ sub emit_py_func_decode($$$$)
     print PYOUT "\n";
     print PYOUT "select_phase:\n" if ($have_bulk);
     print PYOUT "\tcount = call->data_count;\n";
+    #print PYOUT "\tprintf(\"-- Phase %u (%u) --\\n\", phase, count);\n";
     print PYOUT "\tswitch (phase) {\n";
 
     print PYOUT "\tcase 0:\n";
@@ -555,23 +603,32 @@ sub emit_py_func_decode($$$$)
        }
 
        # Determine how big bulk objects are
-       if ($phase->{type} eq "bulk") {
+       if ($phase->{type} eq "blob") {
            my $p = $phase->{params}->[0];
-           if ($p->{elem}->{class} eq "basic" ||
-               $p->{elem}->{class} eq "struct") {
-               print PYOUT "\t\tobj->x.", $p->{name}, " = PyList_New(call->bulk_count);\n";
-               print PYOUT "\t\tif (!obj->x.", $p->{name}, ")\n";
-               print PYOUT "\t\t\treturn -1;\n";
-           } elsif ($p->{elem}->{class} eq "string") {
-               print PYOUT "\t\tobj->x.", $p->{name}, " = PyUnicode_New(call->bulk_count, 255);\n";
-               print PYOUT "\t\tif (!obj->x.", $p->{name}, ")\n";
-               print PYOUT "\t\t\treturn -1;\n";
-               print PYOUT "\t\tcall->bulk_item = obj->x.", $p->{name}, ";\n";
+           if ($p->{elem}->{class} eq "string") {
+               print PYOUT "\t\tswitch (py_dec_init_string(call, &obj->x.", $p->{name}, ")) {\n";
            } elsif ($p->{elem}->{class} eq "opaque") {
                print PYOUT "\t\tobj->x.", $p->{name}, " = PyByteArray_FromStringAndSize(\"\", 0);\n";
                print PYOUT "\t\tif (!obj->x.", $p->{name}, ")\n";
                print PYOUT "\t\t\treturn -1;\n";
-               print PYOUT "\t\tif (PyByteArray_Resize(obj->x.", $p->{name}, ", call->bulk_count) == -1)\n";
+               print PYOUT "\t\tif (PyByteArray_Resize(obj->x.", $p->{name}, ", call->blob_size) == -1)\n";
+               print PYOUT "\t\t\treturn -1;\n";
+
+               print PYOUT "\t\tswitch (py_dec_init_opaque(call, obj->x.", $p->{name}, ")) {\n";
+           } else {
+               die;
+           }
+           print PYOUT "\t\tcase -1: return -1;\n";
+           print PYOUT "\t\tcase  0: goto phase_", $phix + 1, ";\n";
+           print PYOUT "\t\tcase  1: break;\n";
+           print PYOUT "\t\t}\n";
+           $phase_goto_label = $phix + 1;
+       } elsif ($phase->{type} eq "bulk") {
+           my $p = $phase->{params}->[0];
+           if ($p->{elem}->{class} eq "basic" ||
+               $p->{elem}->{class} eq "struct") {
+               print PYOUT "\t\tobj->x.", $p->{name}, " = PyList_New(call->bulk_count);\n";
+               print PYOUT "\t\tif (!obj->x.", $p->{name}, ")\n";
                print PYOUT "\t\t\treturn -1;\n";
            } else {
                die;
@@ -584,24 +641,40 @@ sub emit_py_func_decode($$$$)
        }
 
        # Entry point for a phase
-       print PYOUT "\t\tcall->need_size = ", $phase->{size}, ";\n";
+       if ($phase->{type} ne "split") {
+           print PYOUT "\t\tcall->need_size = ", $phase->{size}, ";\n";
+       } else {
+           print PYOUT "\t\tif (py_rxgen_split_receive(call, 1) < 0)\n";
+           print PYOUT "\t\t\treturn -1;\n";
+           print PYOUT "\t\tif (call->need_size == 0)\n";
+           print PYOUT "\t\t\tgoto phase_", $phix + 1, ";\n";
+           $phase_goto_label = $phix + 1;
+       }
        print PYOUT "\t\tcall->phase = ", $phix, ";\n";
        print PYOUT "\tcase ", $phix, ":\n";
 
-       print PYOUT "\t\tif (count < ", $phase->{size}, ")\n";
-       print PYOUT "\t\t\treturn 1;\n";
+       if ($phase->{type} ne "split") {
+           print PYOUT "\t\tif (count < call->need_size)\n";
+           print PYOUT "\t\t\treturn 1;\n";
+       } else {
+           print PYOUT "\t\tif (call->need_size == UINT_MAX ? count == 0 : count < call->need_size) {\n";
+           #print PYOUT "\t\t\tprintf(\"NEED %u (phase %u)\\n\", call->need_size, phase);\n";
+           print PYOUT "\t\t\treturn 1;\n";
+           print PYOUT "\t\t}\n";
+       }
 
        # Unmarshal the data
        print PYOUT "\n";
        foreach my $p (@{$phase->{params}}) {
-           if ($p->{type} eq "bulk_size") {
+           if ($p->{type} eq "blob_size") {
+               print PYOUT "\t\tcall->blob_size = rxrpc_dec(call);\n";
+               next;
+           } elsif ($p->{type} eq "bulk_size") {
                print PYOUT "\t\tcall->bulk_count = rxrpc_dec(call);\n";
                next;
            }
 
-           if ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "basic" ||
-                                         $p->{elem}->{class} eq "struct")
-               ) {
+           if ($p->{class} eq "bulk") {
                if ($p->{elem}->{class} eq "struct") {
                    print PYOUT "\t\titem = py_decode_", $p->{type}, "(call);\n";
                } elsif ($p->{elem}->{xdr_size} == 4 && $p->{type} =~ /^u/) {
@@ -626,10 +699,16 @@ sub emit_py_func_decode($$$$)
                print PYOUT "\t\t\treturn -1;\n";
                print PYOUT "\t\tcall->bulk_index++;\n";
 
-           } elsif ($p->{class} eq "bulk" && $p->{elem}->{class} eq "string") {
-               print PYOUT "\t\tpy_dec_string(call);\n";
-           } elsif ($p->{class} eq "bulk" && $p->{elem}->{class} eq "opaque") {
-               print PYOUT "\t\tpy_dec_opaque(call, obj->x.", $p->{name}, ");\n";
+           } elsif ($p->{class} eq "blob") {
+               if ($p->{elem}->{class} eq "string") {
+                   print PYOUT "\t\tswitch (py_dec_into_string(call)) {\n";
+               } else {
+                   print PYOUT "\t\tswitch (py_dec_into_buffer(call)) {\n";
+               }
+               print PYOUT "\t\tcase -1: return -1;\n";
+               print PYOUT "\t\tcase  0: break;\n";
+               print PYOUT "\t\tcase  1: phase = ", $phix, "; goto select_phase;\n";
+               print PYOUT "\t\t}\n";
            } elsif ($p->{class} eq "basic" && $p->{xdr_size} == 4) {
                print PYOUT "\t\tobj->x.", $p->{name}, " = (", $p->{type}, ")rxrpc_dec(call);\n";
            } elsif ($p->{class} eq "basic" && $p->{xdr_size} == 8) {
@@ -641,7 +720,14 @@ sub emit_py_func_decode($$$$)
                die $p->{where}, ": Unsupported type in decode";
            }
 
-           if ($p->{class} eq "bulk") {
+           if ($p->{class} eq "blob" &&  $p->{elem}->{class} eq "string") {
+               print PYOUT "\t\tif (rxrpc_post_dec(call) < 0)\n";
+               print PYOUT "\t\t\treturn -1;\n";
+               print PYOUT "\t\tif (call->blob_offset < call->blob_size) {\n";
+               print PYOUT "\t\t\tphase = ", $phix, ";\n";
+               print PYOUT "\t\t\tgoto select_phase;\n";
+               print PYOUT "\t\t}\n";
+           } elsif ($p->{class} eq "bulk") {
                print PYOUT "\t\tif (rxrpc_post_dec(call) < 0)\n";
                print PYOUT "\t\t\treturn -1;\n";
                print PYOUT "\t\tif (call->bulk_index < call->bulk_count) {\n";
@@ -651,7 +737,21 @@ sub emit_py_func_decode($$$$)
            }
        }
 
-       if ($phase->{type} ne "bulk") {
+       if ($phase->{type} eq "split") {
+           print PYOUT "\t\tswitch (py_rxgen_split_receive(call, 0)) {\n";
+           print PYOUT "\t\tcase -1: return -1;\n";
+           print PYOUT "\t\tcase  0: break;\n";
+           print PYOUT "\t\tcase  1: phase = ", $phix, "; goto select_phase;\n";
+           print PYOUT "\t\t}\n";
+           #print PYOUT "\t\tif (rxrpc_post_dec(call) < 0)\n";
+           #print PYOUT "\t\t\treturn -1;\n";
+           #print PYOUT "\t\tif (call->need_size != 0) {\n";
+           #print PYOUT "\t\t\tphase = ", $phix, ";\n";
+           #print PYOUT "\t\t\tgoto select_phase;\n";
+           #print PYOUT "\t\t}\n";
+       }
+
+       if ($phase->{type} ne "bulk" && $phase->{type} ne "blob") {
            print PYOUT "\t\tif (rxrpc_post_dec(call) < 0)\n";
            print PYOUT "\t\t\treturn -1;\n";
        }
index 9a2e18aeaca2abd3e1a7fb1bf9ed6ad0f7c2b046..9781a11d762c00e3afd7b1c1551115887db98e58 100755 (executable)
@@ -63,7 +63,7 @@ my $where = "";
 #
 # Each type is specified by a hash of the following elements:
 #
-#      class           Complexity class (basic, string, struct, array, bulk)
+#      class           Complexity class (basic, string, struct, blob, array, bulk)
 #      type            Basic/struct type (char, {u,}int{8,16,32,64}_t, opaque, struct name)
 #      elem            Ref to element type def (if array/bulk)
 #      multi           1 if array or bulk type, 0 otherwise
@@ -119,8 +119,16 @@ sub define_typedef($$$)
 
     my %combined = %{$type};
 
+    if (exists $flags->{class} && $flags->{class} eq "bulk") {
+       if ($type->{class} eq "string" ||
+           $type->{class} eq "opaque") {
+           $flags->{class} = "blob";
+       }
+    }
+
     if (exists $flags->{class} &&
-       ($flags->{class} eq "bulk" ||
+       ($flags->{class} eq "blob" ||
+        $flags->{class} eq "bulk" ||
         $flags->{class} eq "array")) {
        die $where, ": Typedef'ing array/bulk as array/bulk not supported\n"
            if ($type->{multi});
@@ -314,7 +322,7 @@ discarded_comments:
            if ($line =~ /([a-zA-Z_][a-zA-Z0-9_]*) ([a-zA-Z_][a-zA-Z0-9_]*);/) {
                my %member = %{look_up_type($1)};
                die $where, ": Don't support bulk constructs in structs\n"
-                   if ($member{class} eq "bulk");
+                   if ($member{class} eq "bulk" || $member{class} eq "blob");
                $member{name} = $2;
                $member{where} = $where;
                push $struct->{members}, \%member;
@@ -425,7 +433,12 @@ discarded_comments:
                if ($bulk_dim) {
                    die $where, ": Bulk-of-bulk parameters not supported\n"
                        if ($param{class} eq "bulk");
-                   $param{class} = "bulk";
+                   if ($type->{class} eq "string" ||
+                       $type->{class} eq "opaque") {
+                       $param{class} = "blob";
+                   } else {
+                       $param{class} = "bulk";
+                   }
                    $param{elem} = $type;
                    $param{dim} = $bulk_dim unless $bulk_dim eq -1;
                }
@@ -573,6 +586,11 @@ foreach $func (@funcs) {
            ;
        } elsif ($p->{class} eq "struct") {
            die unless (exists $p->{xdr_size});
+       } elsif ($p->{class} eq "blob") {
+           die $p->{where}, ": No element type" unless (exists $p->{elem});
+           if (exists $p->{dim}) {
+               die $where, ": Missing constant ", $p->{dim} unless exists $constants{$p->{dim}};
+           }
        } elsif ($p->{class} eq "bulk") {
            die $p->{where}, ": No element type" unless (exists $p->{elem});
            die $p->{where}, ": Element has no XDR size" unless (exists $p->{elem}->{xdr_size});
diff --git a/suite/commands/bos/getlog.py b/suite/commands/bos/getlog.py
new file mode 100644 (file)
index 0000000..8abf115
--- /dev/null
@@ -0,0 +1,121 @@
+#
+# AFS Server management toolkit: Server log fetcher
+# -*- coding: utf-8 -*-
+#
+
+__copyright__ = """
+Copyright (C) 2014 Red Hat, Inc. All Rights Reserved.
+Written by David Howells (dhowells@redhat.com)
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU General Public Licence version 2 as
+published by the Free Software Foundation.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+GNU General Public Licence for more details.
+
+You should have received a copy of the GNU General Public Licence
+along with this program; if not, write to the Free Software
+Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+"""
+
+from afs.exception import AFSException
+from afs.argparse import *
+from afs.lib.output import *
+import kafs
+import os
+
+help = "Print a server process's log file"
+
+command_arguments = [
+    [ "server",         get_bosserver,          "rs",         "<machine name>" ],
+    [ "file",           get_file_name,          "rs",         "<log file to examine>" ],
+    [ "cell",           get_cell,               "os",         "<cell name>" ],
+    [ "noauth",         get_auth,               "fn" ],
+    [ "localauth",      get_auth,               "fn" ],
+    [ "verbose",        get_verbose,            "fn" ],
+    [ "encrypt",        get_dummy,              "fn" ],
+]
+
+cant_combine_arguments = [
+    ( "cell",           "localauth" ),
+    ( "noauth",         "localauth" ),
+]
+
+argument_size_limits = {
+    "file"              : kafs.BOZO_BSSIZE,
+}
+
+description = r"""
+Print a server process's log file
+"""
+
+class split_handler:
+    def __init__(self):
+        pass
+
+    def transmit(self, split_info):
+        return None
+
+    # Receive state machine.  Returns True if more data is required to be read
+    # or another state must be transited to and False if the operation is now
+    # complete.
+    #
+    def receive(self, split_info):
+        # We want to receive everything we can - the entire response phase
+        # belongs to the split handler
+        if split_info.state == 0:
+            split_info.will_recv_all()
+            split_info.state = 1
+            return True
+
+        if split_info.state == 1:
+            avail = split_info.data_available()
+            if avail == None:
+                # Last byte read
+                return False
+            if avail == 0:
+                split_info.will_recv_all()
+                return True
+
+            if avail > 4096:
+                avail = 4096
+            buf = bytearray(avail)
+            split_info.target = buf
+            split_info.state = 2
+
+            # Request reception start.  This function will be reentered with
+            # phase incremented when it's done.  receive_failed() will be
+            # called instead upon failure.
+            return split_info.begin_recv(buf, False)
+
+        if split_info.state == 3:
+            buf = split_info.target
+            split_info.target = None
+
+            # Strip any terminal NUL chars
+            buf = buf.rstrip(b"\0")
+            output_raw(buf)
+            del buf
+
+            split_info.state = 1
+            return True
+
+        raise AFSException("Unexpected receive phase ", split_info.phase,
+                           " in getlog() split.receive()")
+
+    def receive_failed(self, split_info):
+        print("Receive failed in phase", split_info.phase)
+        split_info.target = None
+
+def main(params):
+    cell = params["cell"]
+    bos_conn = cell.open_bos_server(params["server"], params)
+
+    split = split_handler()
+
+    output("Fetching log file '", params["file"], "'...\n")
+    output_flush()
+    ret = kafs.BOZO_GetLog(bos_conn, params["file"], split)
index e9ada3248828c74dda7f8b8a65a8a9db494c7dc4..0de39f0f6503599afe6f5abf1009bfe821f174cc 100644 (file)
@@ -61,7 +61,7 @@ class split_handler:
         self.__size = size
         self.__pos = 0
 
-    def client_transmit(self, split_call):
+    def transmit(self, split_info):
         while self.__pos < self.__size:
             verbose("--- XMIT ", self.__pos, "/", self.__size, "\n")
             size = self.__size - self.__pos
@@ -72,11 +72,11 @@ class split_handler:
             buf = self.__fd.read(size)
             if len(buf) < size:
                 raise IOError("Short read on file " + self.__name)
-            split_call.send(buf, self.__pos < self.__size)
+            split_info.send(buf, self.__pos < self.__size)
         return True
 
-    def client_receive(self, split_call):
-        return True
+    def receive(self, split_info, inited):
+        return None
 
 def main(params):
     cell = params["cell"]
index 17906f70b5502d168648fe2b0e9a74767e99ac48..fe5b93235353361be4a415fd779b7ad3573d386f 100644 (file)
@@ -50,6 +50,13 @@ def output(*args):
     for i in args:
         sys.stdout.write(str(i))
 
+def output_raw(*args):
+    for i in args:
+        sys.stdout.buffer.write(i)
+
+def output_flush():
+    sys.stdout.flush()
+
 def outputf(formatstr, *args):
     sys.stdout.write(formatstr.format(*args))
 
index b4b936fab647a55833f16e46319d7589388e1bf8..b4ecc357c123c1757e9422f0883b702032197809 100644 (file)
@@ -172,7 +172,7 @@ def _main():
     except KeyboardInterrupt:
         sys.exit(1)
     except SystemExit:
-        pass
+        sys.exit(0)
     except:
         print('Unhandled exception:', file=sys.stderr)
         traceback.print_exc()