From e2f47f1249a35d24ddc7f14e8e68ae3cbd9a032f Mon Sep 17 00:00:00 2001 From: David Howells Date: Thu, 1 May 2014 15:53:03 +0100 Subject: [PATCH] Implement "bos getlog" and reengineer the decode path somewhat Implement the "bos getlog" subcommand - which requires making the receive side of split RPC function work. To this end, the following reengineering has been done to the decode side: (1) Lift the requirement that the Rx buffer cursor be 4-byte aligned as BOZO.GetLog() doesn't align the log data it returns. (2) Permit a request for an unknown - and possibly zero - amount of data to be set in call->need_size. BOZO.GetLog() doesn't tell you in advance how much data it will return: the data stops when the Rx receive phase ends with LAST_PACKET being flagged. (3) Separate blob decoding (strings & opaques) from bulk decoding in the member variables of the rx_call struct, where a bulk decode is now a sequence of blobs, structs or ints. (4) Handle blob decoding asynchronously, where the buffer into which a blob is being written may not represent contiguous memory. This is done by setting up with an init function, called once, and a decode-into function, called repeatedly whilst it returns 1. The function is only called when there is sufficient data in the receive buffers. (5) Decode padding asynchronously by working out up front for a blob how much padding it requires and then decoding it as its own blob at the end using a special source buffer as a marker to switch processing. (6) Perform split reception by adding additional states within the decode state machine to call out to the handler functions. The receive() method of the split_handler class provided to a split RPC function then implements its own state machine on top of the decoder state machine. Further: (*) rxrpc_recv_data() now reports ENODATA rather than EMSGSIZE if the receive phase ends with short data. (*) MSG_MORE handling has separate rx_call struct members for the send and receive phase now to avoid confusion. (*) rxrpc_enc/dec_slow() now use rxrpc_enc/dec_blob() to avoid duplicating a lot of code. (*) rxrpc_enc/dec() now also go to the slow path if the cursor is misaligned, which may mean the data to be read is split across buffers. (*) The rx_call struct now has a pointer to a cleanup function to clean up the decoder state at the end to deal with aborted calls. (*) rxrpc_post_dec() reduces call->need_size as well as call->data_count as we don't want to be waiting for N bytes to turn up for a size-N blob if we have already received N-1 bytes of it and only need one more byte. This does mean that call->need_size may need resetting more often. (*) Added an output_raw() python output method to permit bytearray objects to be printed. Conversion to a string means that control characters and quotes get escaped (eg. a newline char gets converted to \n). (*) Catching the SystemExit exception should not produce an error due to ret not existing as a variable. Signed-off-by: David Howells --- af_rxrpc.c | 224 +++++++------- py_rxgen.c | 548 ++++++++++++++++++++++------------ py_rxgen.h | 37 ++- py_rxsplit.c | 248 +++++++++++---- rxgen.h | 29 +- rxgen/emit_c_sync_funcs.pm | 203 +++++++++---- rxgen/emit_py_module.pm | 2 +- rxgen/emit_py_sync_funcs.pm | 226 ++++++++++---- rxgen/rxgen.pl | 26 +- suite/commands/bos/getlog.py | 121 ++++++++ suite/commands/bos/install.py | 8 +- suite/lib/output.py | 7 + suite/main.py | 2 +- 13 files changed, 1174 insertions(+), 507 deletions(-) create mode 100644 suite/commands/bos/getlog.py diff --git a/af_rxrpc.c b/af_rxrpc.c index 69ce2fc..91a671c 100644 --- a/af_rxrpc.c +++ b/af_rxrpc.c @@ -16,6 +16,7 @@ #include #include #include +#include #include #include "af_rxrpc.h" #include "rxgen.h" @@ -26,6 +27,8 @@ #define debug(fmt, ...) do { if (0) printf(fmt, ## __VA_ARGS__); } while (0) +uint32_t rxgen_dec_padding_sink; + /* * dump the control messages */ @@ -283,7 +286,8 @@ struct rx_call *rxrpc_alloc_call(struct rx_connection *z_conn, call->magic = RXGEN_CALL_MAGIC; call->conn = z_conn; - call->more = 1; + call->more_send = true; + call->more_recv = true; call->buffer_head = buf; call->buffer_tail = buf; call->data_start = data; @@ -393,7 +397,7 @@ more_to_send: iov[ioc].iov_base = cursor->buf + io_cursor; iov[ioc].iov_len = end - io_cursor; if (cursor == call->buffer_tail) { - if (!call->more) + if (!call->more_send) more = 0; ioc++; break; @@ -445,10 +449,10 @@ more_to_send: if (call->data_count > 0) goto more_to_send; - if (call->data_count == 0 && !call->more) + if (call->data_count == 0 && !call->more_send) call->state++; - if (!call->more) { + if (!call->more_send) { if (call->state == rx_call_cl_encoding_params || call->state == rx_call_sv_encoding_response) call->state++; @@ -494,7 +498,7 @@ int rxrpc_recv_data(struct rx_connection *z_conn, bool nowait) msg.msg_flags = 0; ret = recvmsg(z_conn->fd, &msg, MSG_PEEK | (nowait ? MSG_DONTWAIT : 0)); - debug("RECVMSG: %d\n", ret); + debug("RECVMSG PEEK: %d\n", ret); if (ret == -1) return -1; @@ -571,7 +575,7 @@ int rxrpc_recv_data(struct rx_connection *z_conn, bool nowait) if (ret == -1) return -1; - debug("RECV: %d [fl:%d]\n", ret, msg.msg_flags); + debug("RECV: %d [fl:%x]\n", ret, msg.msg_flags); debug("CMSG: %zu\n", msg.msg_controllen); debug("IOV: %zu [0]=%zu\n", msg.msg_iovlen, iov[0].iov_len); @@ -607,14 +611,14 @@ int rxrpc_recv_data(struct rx_connection *z_conn, bool nowait) rxrpc_check_call(call); for (cursor = call->buffer_head; cursor; cursor = cursor->next) - debug("Recv buf=%p data=%p ioc=%u\n", + debug("RecvQ buf=%p data=%p iocur=%u\n", cursor, cursor->buf, cursor->io_cursor); /* Process the metadata */ if (msg.msg_flags & MSG_EOR) call->known_to_kernel = 0; if (!(msg.msg_flags & MSG_MORE)) - call->more = 0; + call->more_recv = false; for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) { unsigned char *p; @@ -690,10 +694,16 @@ loop: case rx_call_cl_decoding_response: case rx_call_sv_decoding_opcode: case rx_call_sv_decoding_params: - if (call->data_count < call->need_size) { + if (call->need_size == UINT_MAX) { + /* A split mode call wants everything as it comes in, + * and is willing to settle for nothing also. + */ + if (call->data_count <= 0 && call->more_recv) + return 0; + } else if (call->data_count < call->need_size) { if (!(msg.msg_flags & MSG_MORE)) { /* Short data */ - call->error_code = EMSGSIZE; + call->error_code = ENODATA; rxrpc_abort_call(call, 1); } return 0; @@ -766,6 +776,7 @@ void rxrpc_abort_call(struct rx_call *call, uint32_t abort_code) unsigned char control[128]; if (call->known_to_kernel) { + memset(control, 0, sizeof(control)); ctrllen = 0; RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)call); RXRPC_ADD_ABORT(control, ctrllen, abort_code); @@ -802,6 +813,8 @@ void rxrpc_terminate_call(struct rx_call *call, uint32_t abort_code) free(cursor->buf); free(cursor); } + if (call->decoder_cleanup) + call->decoder_cleanup(call); free(call); } @@ -841,96 +854,6 @@ int rxrpc_run_sync_call(struct rx_call *call) } } -/* - * Slow path for decoding from data read from the buffer list. - */ -void rxrpc_enc_slow(struct rx_call *call, net_xdr_t data) -{ - struct rx_buf *cursor, *new; - uint8_t *buf; - - rxrpc_check_call(call); - - if (call->error_code) - return; - if (call->data_cursor != call->data_stop) - abort(); - rxrpc_post_enc(call); - - new = calloc(1, sizeof(struct rx_buf)); - if (!new) - goto handle_oom; - new->buf = buf = malloc(RXGEN_BUFFER_SIZE); - if (!buf) - goto handle_oom_buf; - new->magic = RXGEN_BUF_MAGIC; - - *(net_xdr_t *)buf = data; - - cursor = call->buffer_tail; - cursor->next = new; - call->data_cursor = call->data_start = buf + sizeof(net_xdr_t); - call->data_stop = buf + RXGEN_BUFFER_SIZE; - call->buffer_space += RXGEN_BUFFER_SIZE - sizeof(net_xdr_t); - call->buffer_tail = new; - return; - -handle_oom_buf: - new->magic = RXGEN_BUF_DEAD; - free(new); -handle_oom: - call->error_code = ENOMEM; -} - -/* - * Slow path for decoding from data read from the buffer list. - */ -uint32_t rxrpc_dec_slow(struct rx_call *call) -{ - struct rx_buf *cursor, *spent; - net_xdr_t x; - unsigned count; - uint8_t *new_stop; - - rxrpc_check_call(call); - rxrpc_post_dec(call); - - /* More data may have come into a partially filled buffer from which we - * previously read. - */ - cursor = call->buffer_head; - count = cursor->io_cursor & ~3; - new_stop = cursor->buf + count; - if (call->data_stop >= new_stop) { - /* This buffer must then be completely used as we're required to check - * amount received before reading it. - */ - if (new_stop != cursor->buf + RXGEN_BUFFER_SIZE) - abort(); /* Didn't completely consume a buffer */ - if (call->buffer_tail == cursor) - abort(); /* Unexpectedly out of data */ - - /* Move to the next buffer */ - spent = cursor; - cursor = cursor->next; - call->buffer_head = cursor; - - call->data_cursor = call->data_start = cursor->buf; - count = cursor->io_cursor & ~3; - if (count == 0) - abort(); - new_stop = cursor->buf + count; - spent->magic = RXGEN_BUF_DEAD; - free(spent->buf); - free(spent); - } - - call->data_stop = new_stop; - x = *(net_xdr_t *)call->data_cursor; - call->data_cursor += sizeof(x); - return ntohl(x); -} - /* * Discard excess received data */ @@ -965,7 +888,7 @@ int rxgen_dec_discard_excess(struct rx_call *call) /* * Encode a string of bytes */ -int rxrpc_enc_bytes(struct rx_call *call, const void *data, size_t size) +int rxrpc_enc_blob(struct rx_call *call, const void *data, size_t size) { struct rx_buf *cursor, *new; uint8_t *buf; @@ -1020,35 +943,65 @@ handle_oom: } /* - * Decode a string of bytes + * Slow path for decoding from data read from the buffer list. */ -void rxrpc_dec_bytes(struct rx_call *call) +void rxrpc_enc_slow(struct rx_call *call, net_xdr_t data) { - struct rx_buf *cursor, *spent; - unsigned needed, count; - uint8_t *new_stop; + rxrpc_enc_blob(call, &data, sizeof(data)); +} + +/* + * Decode a blob. + * + * This works progressively and so may need to be called multiple times for any + * particular set. + * + * call->blob_size indicates how many bytes we need to extract in total and + * call->blob_offset indicates how many we have done so far. blob_offset is + * updated before returning. + */ +void rxrpc_dec_blob(struct rx_call *call) +{ + unsigned needed, segment; rxrpc_check_call(call); rxrpc_post_dec(call); - needed = call->bulk_count - call->bulk_index; - count = call->data_stop - call->data_cursor; - if (count > 0) { - if (count > needed) - count = needed; - memcpy(call->bulk_item + call->bulk_index, call->data_cursor, count); - call->bulk_index += count; - call->data_cursor += count; - if (call->bulk_index >= call->bulk_count) + needed = call->blob_size - call->blob_offset; + segment = call->data_stop - call->data_cursor; + debug("DEC BLOB dc=%u bsize=%u seg=%u\n", call->data_count, needed, segment); + + if (segment > 0) { + if (segment > needed) + segment = needed; + + debug("DEC BLOB copy %u\n", segment); + memcpy(call->blob + call->blob_offset, call->data_cursor, segment); + call->blob_decoded += segment; + call->blob_offset += segment; + call->data_cursor += segment; + if (call->blob_offset >= call->blob_size) return; } + rxrpc_dec_advance_buffer(call); +} + +/* + * Progress the buffer chain during decoding. + */ +void rxrpc_dec_advance_buffer(struct rx_call *call) +{ + struct rx_buf *cursor, *spent; + unsigned segment; + uint8_t *new_stop; + /* More data may have come into a partially filled buffer from which we * previously read. */ cursor = call->buffer_head; - count = cursor->io_cursor & ~3; - new_stop = cursor->buf + count; + segment = cursor->io_cursor; + new_stop = cursor->buf + segment; if (call->data_stop >= new_stop) { /* This buffer must then be completely used as we're required to check * amount received before reading it. @@ -1059,15 +1012,16 @@ void rxrpc_dec_bytes(struct rx_call *call) abort(); /* Unexpectedly out of data */ /* Move to the next buffer */ + rxrpc_post_dec(call); spent = cursor; cursor = cursor->next; call->buffer_head = cursor; call->data_cursor = call->data_start = cursor->buf; - count = cursor->io_cursor & ~3; - if (count == 0) + segment = cursor->io_cursor; + if (segment == 0) abort(); - new_stop = cursor->buf + count; + new_stop = cursor->buf + segment; spent->magic = RXGEN_BUF_DEAD; free(spent->buf); @@ -1077,3 +1031,33 @@ void rxrpc_dec_bytes(struct rx_call *call) call->data_stop = new_stop; rxrpc_check_call(call); } + +/* + * Slow path for decoding a 4-byte integer read from the buffer list. + */ +uint32_t rxrpc_dec_slow(struct rx_call *call) +{ + net_xdr_t x; + uint32_t ret; + + debug("DEC SLOW %lu %ld\n", + (unsigned long)call->data_cursor & 3, + call->data_cursor - call->data_stop); + + call->blob_size = 4; + call->blob_offset = 0; + call->blob = &x; + + /* We should only come here if there is sufficient data in the + * buffers for us to complete all the copy operation(s). + */ + if (call->data_count < call->blob_size) + abort(); + + while (call->blob_offset < call->blob_size) + rxrpc_dec_blob(call); + + ret = ntohl(x); + debug("DEC SLOW = %x\n", ret); + return ret; +} diff --git a/py_rxgen.c b/py_rxgen.c index 3d73397..85ed659 100644 --- a/py_rxgen.c +++ b/py_rxgen.c @@ -16,6 +16,19 @@ #include "afs_py.h" #include "rxgen.h" +#define debug(fmt, ...) do { if (0) printf(fmt, ## __VA_ARGS__); } while (0) + +struct py_dec_index { + unsigned long val; + void *ptr; +}; + +struct py_dec_manager { + Py_buffer view; + struct py_dec_index top; + struct py_dec_index indices[]; +}; + PyObject *py_rxgen_get_struct(const void *p, PyObject **cache, PyObject *(*data_to_type)(const void *elem)) { @@ -682,90 +695,24 @@ int py_rxgen_initialise_members(PyObject *obj, PyObject *kw) return 0; } -/* - * Decode a string of bytes into a preallocated unicode string python object. - */ -void py_dec_string(struct rx_call *call) -{ - PyObject *str = call->bulk_item; - struct rx_buf *cursor, *spent; - unsigned needed, count; - uint8_t *new_stop; - - if (PyUnicode_KIND(str) != PyUnicode_1BYTE_KIND) - abort(); - - rxrpc_post_dec(call); - - needed = call->bulk_count - call->bulk_index; - count = call->data_stop - call->data_cursor; - if (count > 0) { - if (count > needed) - count = needed; - memcpy(PyUnicode_DATA(str) + call->bulk_index, call->data_cursor, count); - call->bulk_index += count; - call->data_cursor += count; - rxrpc_dec_align(call); - if (call->bulk_count <= call->bulk_index) - return; - - } - - /* More data may have come into a partially filled buffer from which we - * previously read. - */ - cursor = call->buffer_head; - count = cursor->io_cursor & ~3; - new_stop = cursor->buf + count; - if (call->data_stop >= new_stop) { - /* This buffer must then be completely used as we're required to check - * amount received before reading it. - */ - if (new_stop != cursor->buf + RXGEN_BUFFER_SIZE) - abort(); /* Didn't completely consume a buffer */ - if (call->buffer_tail == cursor) - abort(); /* Unexpectedly out of data */ - - /* Move to the next buffer */ - spent = cursor; - cursor = cursor->next; - call->buffer_head = cursor; - - call->data_cursor = call->data_start = cursor->buf; - count = cursor->io_cursor & ~3; - if (count == 0) - abort(); - new_stop = cursor->buf + count; - - free(spent->buf); - free(spent); - } - - call->data_stop = new_stop; -} - /* * Recursively encode a standard C array. */ static int py_enc_c_array(struct rx_call *call, const void *data, int dim, const Py_ssize_t *dim_size, - Py_ssize_t itemsize, - int align) + Py_ssize_t itemsize) { if (dim == 1) { /* Data subarray */ - rxrpc_enc_bytes(call, data, *dim_size * itemsize); - if (align) - rxrpc_enc_align(call); + rxrpc_enc_blob(call, data, *dim_size * itemsize); return rxrpc_post_enc(call); } else { /* Pointer subarray */ const void *const *ptrs = data; Py_ssize_t i; for (i = 0; i < *dim_size; i++) - if (py_enc_c_array(call, ptrs[i], dim - 1, dim_size + 1, itemsize, - align) < 0) + if (py_enc_c_array(call, ptrs[i], dim - 1, dim_size + 1, itemsize) < 0) return -1; return 0; } @@ -784,7 +731,7 @@ static int py_enc_numpy_array(struct rx_call *call, if (dim == 0) /* Single data item */ - return rxrpc_enc_bytes(call, data, itemsize); + return rxrpc_enc_blob(call, data, itemsize); for (i = 0; i < *dim_size; i++) { if (py_enc_numpy_array(call, data + i * *dim_stride, @@ -810,7 +757,7 @@ static int py_enc_pil_array(struct rx_call *call, if (dim == 0) /* Single data item */ - return rxrpc_enc_bytes(call, data, itemsize); + return rxrpc_enc_blob(call, data, itemsize); for (i = 0; i < *dim_size; i++) { const void *ptr = data + i * *dim_stride; @@ -832,22 +779,21 @@ static int py_enc_pil_array(struct rx_call *call, * Encode the just the contents of a python buffer view, without length or * realignment. */ -int py_enc_buffer_raw(struct rx_call *call, Py_buffer *view, size_t dim, int align) +int py_enc_buffer_raw(struct rx_call *call, Py_buffer *view, size_t dim) { if (call->error_code) return -1; if (view->ndim == 0 || (view->ndim == 1 && !view->shape)) { /* Simple scalar array */ - if (rxrpc_enc_bytes(call, view->buf, view->len) < 0) + if (rxrpc_enc_blob(call, view->buf, view->len) < 0) return -1; return rxrpc_post_enc(call); } if (!view->strides) /* Standard C array */ - return py_enc_c_array(call, view->buf, view->ndim, view->shape, view->itemsize, - align); + return py_enc_c_array(call, view->buf, view->ndim, view->shape, view->itemsize); if (!view->suboffsets) return py_enc_numpy_array(call, view->buf, view->ndim, view->shape, view->strides, @@ -863,17 +809,18 @@ int py_enc_buffer_raw(struct rx_call *call, Py_buffer *view, size_t dim, int ali */ int py_enc_buffer(struct rx_call *call, Py_buffer *view, size_t dim) { + static unsigned zero; int i; if (call->error_code) return -1; if (0) { - printf("PEBUF: l=%zu isz=%zd {", view->len, view->itemsize); + debug("PEBUF: l=%zu isz=%zd {", view->len, view->itemsize); if (view->shape) for (i = 0; i < view->ndim; i++) - printf(" %zd", view->shape[i]); - printf(" }\n"); + debug(" %zd", view->shape[i]); + debug(" }\n"); } if (view->len > dim) { @@ -882,167 +829,386 @@ int py_enc_buffer(struct rx_call *call, Py_buffer *view, size_t dim) } rxrpc_enc(call, view->len); - if (py_enc_buffer_raw(call, view, dim, true) == -1) + if (py_enc_buffer_raw(call, view, dim) == -1) return -1; - rxrpc_enc_align(call); + if (view->len & 3) + rxrpc_enc_blob(call, &zero, 4 - (view->len & 3)); return rxrpc_post_enc(call); } /* - * Recursively decode a standard C array. + * Recursively decode into a standard C array. */ -static int py_dec_c_array(struct rx_call *call, - void *data, int dim, - const Py_ssize_t *dim_size, - Py_ssize_t itemsize) +static int py_dec_c_array(struct rx_call *call, struct py_dec_manager *manager) { - if (dim == 1) { - /* Data subarray */ - call->bulk_item = data; - call->bulk_count = *dim_size * itemsize; - call->bulk_index = 0; - rxrpc_dec_bytes(call); - return rxrpc_post_dec(call); - } else { - /* Pointer subarray */ - void **ptrs = data; - Py_ssize_t i; - for (i = 0; i < *dim_size; i++) - if (py_dec_c_array(call, ptrs[i], dim - 1, dim_size + 1, itemsize) < 0) - return -1; - return 0; + unsigned long val; + int i; + + /* First of all, we finish off the blob we're currently decoding + * into. + */ + if (call->blob_size < manager->view.itemsize) { + rxrpc_dec_blob(call); + if (rxrpc_post_dec(call) < 0) + return -1; + if (call->blob_size < manager->view.itemsize) + goto need_more; + } + + /* Now we increment the per-dimension counters, overflowing and + * wrapping when a counter reaches the limit. When the outermost + * wraps, we are done. + */ + for (i = manager->view.ndim - 1; i >= 0; i--) { + val = manager->indices[i].val; + if (val + 1 < manager->view.shape[i]) { + manager->indices[i].val = val + 1; + goto incremented; + } + manager->indices[i].val = 0; } + + return 0; /* Complete */ + +incremented: + /* Recalculate the cached pointers */ + for (; i <= manager->view.ndim - 1; i++) { + void **ptrs = manager->indices[i - 1].ptr; + void *ptr = ptrs[manager->indices[i].val]; + manager->indices[i].ptr = ptr; + } + + call->blob_offset = 0; + call->blob = manager->indices[manager->view.ndim - 1].ptr; +need_more: + return 1; } /* - * Recursively decode a NumPy-style array. + * Recursively decode into a NumPy-style array. */ -static int py_dec_numpy_array(struct rx_call *call, - void *data, int dim, - const Py_ssize_t *dim_size, - const Py_ssize_t *dim_stride, - Py_ssize_t itemsize) +static int py_dec_numpy_array(struct rx_call *call, struct py_dec_manager *manager) { + unsigned long val; int i; - if (dim == 0) { - /* Single data item */ - call->bulk_item = data; - call->bulk_count = itemsize; - call->bulk_index = 0; - rxrpc_dec_bytes(call); - return 0; + /* First of all, we finish off the blob we're currently decoding + * into. + */ + if (call->blob_size < manager->view.itemsize) { + rxrpc_dec_blob(call); + if (rxrpc_post_dec(call) < 0) + return -1; + if (call->blob_size < manager->view.itemsize) + goto need_more; } - for (i = 0; i < *dim_size; i++) { - if (py_dec_numpy_array(call, data + i * *dim_stride, - dim - 1, dim_size + 1, dim_stride + 1, itemsize) < 0) - return -1; - if (dim == 1 && rxrpc_post_dec(call) < 0) - return -1; + /* Now we increment the per-dimension counters, overflowing and + * wrapping when a counter reaches the limit. When the outermost + * wraps, we are done. + */ + for (i = manager->view.ndim - 1; i >= 0; i--) { + val = manager->indices[i].val; + if (val + 1 < manager->view.shape[i]) { + manager->indices[i].val = val + 1; + goto incremented; + } + manager->indices[i].val = 0; } - return 0; + + return 0; /* Complete */ + +incremented: + /* Recalculate the cached pointers */ + for (; i <= manager->view.ndim - 1; i++) { + void *ptr = manager->indices[i - 1].ptr; + ptr += manager->indices[i].val * manager->view.strides[i]; + manager->indices[i].ptr = ptr; + } + + call->blob_offset = 0; + call->blob = manager->indices[manager->view.ndim - 1].ptr; +need_more: + return 1; } /* - * Recursively decode a Python Imaging Library (PIL)-style array. + * Recursively decode into a Python Imaging Library (PIL)-style array. */ -static int py_dec_pil_array(struct rx_call *call, - void *data, int dim, - const Py_ssize_t *dim_size, - const Py_ssize_t *dim_stride, - const Py_ssize_t *dim_suboffset, - Py_ssize_t itemsize) +static int py_dec_pil_array(struct rx_call *call, struct py_dec_manager *manager) { + unsigned long val; int i; - if (dim == 0) { - /* Single data item */ - call->bulk_item = data; - call->bulk_count = itemsize; - call->bulk_index = 0; - rxrpc_dec_bytes(call); - return 0; + /* First of all, we finish off the blob we're currently decoding + * into. + */ + if (call->blob_size < manager->view.itemsize) { + rxrpc_dec_blob(call); + if (rxrpc_post_dec(call) < 0) + return -1; + if (call->blob_size < manager->view.itemsize) + goto need_more; } - for (i = 0; i < *dim_size; i++) { - void *ptr = data + i * *dim_stride; - if (*dim_suboffset >= 0) { + /* Now we increment the per-dimension counters, overflowing and + * wrapping when a counter reaches the limit. When the outermost + * wraps, we are done. + */ + for (i = manager->view.ndim - 1; i >= 0; i--) { + val = manager->indices[i].val; + if (val + 1 < manager->view.shape[i]) { + manager->indices[i].val = val + 1; + goto incremented; + } + manager->indices[i].val = 0; + } + + return 0; /* Complete */ + +incremented: + /* Recalculate the cached pointers */ + for (; i <= manager->view.ndim - 1; i++) { + void *ptr = manager->indices[i - 1].ptr; + ptr += manager->indices[i].val * manager->view.strides[i]; + if (manager->view.suboffsets[i] >= 0) { ptr = *((void *const *)ptr); - ptr += *dim_suboffset; + ptr += manager->view.suboffsets[i]; } - if (py_dec_pil_array(call, ptr, - dim - 1, dim_size + 1, dim_stride + 1, dim_suboffset + 1, - itemsize) < 0) - return -1; - if (dim == 1 && rxrpc_post_dec(call) < 0) - return -1; + manager->indices[i].ptr = ptr; } - return 0; + + call->blob_offset = 0; + call->blob = manager->indices[manager->view.ndim - 1].ptr; +need_more: + return 1; } /* * Decode the contents of an opaque type */ -int py_dec_opaque(struct rx_call *call, PyObject *obj) +int py_dec_into_buffer(struct rx_call *call) { - Py_buffer view; - int i; + struct py_dec_manager *manager = call->decoder_manager; + int ret; if (call->error_code) return -1; - if (PyObject_GetBuffer(obj, &view, PyBUF_FULL) < 0) - return -1; + if (call->blob == &rxgen_dec_padding_sink) { + rxrpc_dec_blob(call); + if (rxrpc_post_dec(call) == -1) + return -1; + return (call->blob_offset == call->blob_size) ? 0 : 1; + } - if (0) { - printf("PEBUF: l=%zu isz=%zd {", view.len, view.itemsize); - if (view.shape) - for (i = 0; i < view.ndim; i++) - printf(" %zd", view.shape[i]); - printf(" }\n"); + if (manager->view.ndim == 0 || (manager->view.ndim == 1 && !manager->view.shape)) { + /* Single scalar */ + rxrpc_dec_blob(call); + if (rxrpc_post_dec(call) == -1) + return -1; + ret = (call->blob_offset == call->blob_size) ? 0 : 1; + } else if (!manager->view.strides) + ret = py_dec_c_array(call, manager); + else if (!manager->view.suboffsets) + ret = py_dec_numpy_array(call, manager); + else + ret = py_dec_pil_array(call, manager); + + if (ret <= 0) { + PyBuffer_Release(&manager->view); + free(call->decoder_manager); + call->decoder_manager = NULL; } - if (view.ndim == 0 || (view.ndim == 1 && !view.shape)) { - /* Simple scalar array */ - call->bulk_item = view.buf; - call->bulk_count = view.len; - call->bulk_index = 0; - rxrpc_dec_bytes(call); - if (rxrpc_post_dec(call) < 0) - goto error; - goto done; + if (ret == 0 && call->padding_size > 0) { + /* Soak up the padding to a 32-bit boundary */ + call->blob = &rxgen_dec_padding_sink; + call->blob_size = 4 - (call->blob_size & 3); + call->blob_offset = 0; + return 1; + } + + return ret; +} + +/* + * Initialise buffer decode. + * + * Returns 0 if there's nothing to do, 1 if the decode is set up and -1 on error, + */ +int py_dec_init_buffer(struct rx_call *call, Py_buffer *view, bool padded) +{ + struct py_dec_manager *manager = NULL; + size_t size; + int i; + + if (call->decoder_manager) { + manager = call->decoder_manager; + PyBuffer_Release(&manager->view); + free(manager); + manager = NULL; + call->decoder_manager = NULL; + } + + debug("INIT_BUFFER: l=%zu isz=%zd nd=%d\n", view->len, view->itemsize, view->ndim); + call->need_size = view->len; + if (view->len == 0) { + PyBuffer_Release(view); + return 0; + } + + call->padding_size = padded ? 4 - (view->len & 3) : 0; + + size = sizeof(struct py_dec_manager); + size += view->ndim * sizeof(struct py_dec_index); + manager = malloc(size); + if (!manager) { + PyBuffer_Release(view); + PyErr_NoMemory(); + return -1; } - if (!view.strides) { + memset(manager, 0, size); + memcpy(&manager->view, view, sizeof(*view)); + manager->indices[-1].ptr = manager->view.buf; + + call->blob_size = manager->view.itemsize; + + if (manager->view.ndim == 0) { + call->blob_size = manager->view.len; + } else if (manager->view.ndim == 1 && !manager->view.shape) { + manager->indices[0].ptr = manager->indices[-1].ptr; + call->blob_size = manager->view.len; + } else if (!manager->view.strides) { /* Standard C array */ - if (py_dec_c_array(call, view.buf, view.ndim, view.shape, view.itemsize) < 0) - goto error; - goto done; + for (i = 0; i <= manager->view.ndim - 1; i++) { + void **ptrs = manager->indices[i - 1].ptr; + void *ptr = ptrs[manager->indices[i].val]; + manager->indices[i].ptr = ptr; + } + } else if (!manager->view.suboffsets) { + /* NumPy-style Python array */ + for (i = 0; i <= manager->view.ndim - 1; i++) { + void *ptr = manager->indices[i - 1].ptr; + ptr += manager->indices[i].val * manager->view.strides[i]; + manager->indices[i].ptr = ptr; + } + } else { + /* PIL-style Python array */ + for (i = 0; i <= manager->view.ndim - 1; i++) { + void *ptr = manager->indices[i - 1].ptr; + ptr += manager->indices[i].val * manager->view.strides[i]; + if (manager->view.suboffsets[i] >= 0) { + ptr = *((void *const *)ptr); + ptr += manager->view.suboffsets[i]; + } + manager->indices[i].ptr = ptr; + } } - if (!view.suboffsets) { - if (py_dec_numpy_array(call, view.buf, view.ndim, view.shape, view.strides, - view.itemsize) < 0) - goto error; - goto done; + /* Set the first blob to be decoded */ + call->blob = manager->indices[manager->view.ndim - 1].ptr; + call->blob_offset = 0; + call->decoder_manager = manager; + return 1; +} + +/* + * Initialise the decoding of the contents of an opaque type + */ +int py_dec_init_opaque(struct rx_call *call, PyObject *obj) +{ + Py_buffer view; + int ret; + + if (call->error_code) + return -1; + + if (PyObject_GetBuffer(obj, &view, PyBUF_FULL) < 0) { + debug("*** GET BUFFER FAILED\n"); + return -1; } - /* PIL-style Python array */ - if (py_dec_pil_array(call, view.buf, view.ndim, view.shape, view.strides, - view.suboffsets, view.itemsize) < 0) - goto error; + ret = py_dec_init_buffer(call, &view, true); + if (ret == -1) + return -1; -done: - PyBuffer_Release(&view); - return call->error_code ? -1 : 0; + return (ret < 0 || call->error_code) ? -1 : 0; +} -error: - PyBuffer_Release(&view); - return -1; +/* + * Decode a string of bytes into a preallocated unicode string python object. + */ +int py_dec_into_string(struct rx_call *call) +{ + PyObject *str = call->blob; + unsigned needed, segment, i; + + rxrpc_post_dec(call); + + needed = call->blob_size - call->blob_offset; + segment = call->data_stop - call->data_cursor; + debug("DEC STR dc=%u bsize=%u seg=%u\n", call->data_count, needed, segment); + + if (segment > 0) { + if (segment > needed) + segment = needed; + if (call->blob != &rxgen_dec_padding_sink) { + for (i = 0; i < segment; i++) + PyUnicode_WRITE(PyUnicode_KIND(str), PyUnicode_DATA(str), + call->blob_offset + i, call->data_cursor[i]); + } + call->blob_decoded += segment; + call->blob_offset += segment; + call->data_cursor += segment; + if (call->blob_size <= call->blob_offset) { + if (call->blob != &rxgen_dec_padding_sink && call->padding_size > 0) { + /* Soak up the padding to a 32-bit boundary */ + call->blob = &rxgen_dec_padding_sink; + call->blob_size = call->padding_size; + call->blob_offset = 0; + return 1; + } + return 0; + } + } + + rxrpc_dec_advance_buffer(call); + return 1; +} + +/* + * Decode a string of bytes into a preallocated unicode string python object. + */ +int py_dec_init_string(struct rx_call *call, PyObject **_str) +{ + PyObject *str; + + debug("INIT STRING %u\n", call->blob_size); + + str = PyUnicode_New(call->blob_size, 255); + if (!str) + return -1; + *_str = str; + if (call->blob_size == 0) + return 0; + + if (PyUnicode_READY(str) < 0) { + debug("*** STRING NON-CANONICAL\n"); + abort(); + } + + call->blob = str; + call->blob_offset = 0; + call->padding_size = 4 - (call->blob_size & 3); + return 1; } +/* + * Comparator for binary searching the abort code table + */ static int py_rxgen_received_abort_cmp(const void *key, const void *_entry) { const struct kafs_abort_list *entry = _entry; @@ -1085,3 +1251,17 @@ PyObject *py_rxgen_received_abort(struct rx_call *call) else return PyErr_Format(ex, "Aborted %u", call->abort_code); } + +/* + * Clean up a call after decoding + */ +void py_rxgen_decoder_cleanup(struct rx_call *call) +{ + if (call->decoder_manager) { + struct py_dec_manager *manager = call->decoder_manager; + PyBuffer_Release(&manager->view); + free(manager); + } + Py_XDECREF(call->decoder_split_callback); + Py_XDECREF(call->decoder_split_info); +} diff --git a/py_rxgen.h b/py_rxgen.h index eb98fe5..ffe8965 100644 --- a/py_rxgen.h +++ b/py_rxgen.h @@ -12,6 +12,8 @@ #ifndef _PY_RXGEN_H #define _PY_RXGEN_H +#include "rxgen.h" + struct py_rx_connection { PyObject_HEAD struct rx_connection *x; @@ -39,10 +41,13 @@ enum py_rx_split_state { split_dead }; -struct py_rx_split_call { +struct py_rx_split_info { PyObject_HEAD struct rx_call *call; - enum py_rx_split_state state; + PyObject *target; + enum py_rx_split_state split_state; + unsigned state; + bool receiving_data; }; extern PyTypeObject py_rx_connectionType; @@ -51,6 +56,7 @@ extern PyObject *kafs_py_rx_new_connection(PyObject *, PyObject *); extern PyObject *kafs_py_string_to_key(PyObject *, PyObject *); extern int py_rxgen_initialise_members(PyObject *obj, PyObject *kw); +extern void py_rxgen_decoder_cleanup(struct rx_call *call); /* * Single embedded struct handling @@ -67,10 +73,13 @@ extern int py_rxgen_premarshal_struct(void *p, size_t size, size_t offs, */ extern PyObject *py_rxgen_get_string(const void *_p, size_t n); extern int py_rxgen_set_string(void *_p, size_t n, PyObject *val); -extern int py_enc_buffer_raw(struct rx_call *call, Py_buffer *view, size_t dim, int align); +extern int py_enc_buffer_raw(struct rx_call *call, Py_buffer *view, size_t dim); extern int py_enc_buffer(struct rx_call *call, Py_buffer *view, size_t dim); -extern void py_dec_string(struct rx_call *call); -extern int py_dec_opaque(struct rx_call *call, PyObject *obj); +extern int py_dec_into_buffer(struct rx_call *call); +extern int py_dec_init_buffer(struct rx_call *call, Py_buffer *view, bool padded); +extern int py_dec_init_opaque(struct rx_call *call, PyObject *obj); +extern int py_dec_into_string(struct rx_call *call); +extern int py_dec_init_string(struct rx_call *call, PyObject **_str); /* * Embedded general array handling @@ -118,15 +127,21 @@ extern PyObject *py_rxgen_received_abort(struct rx_call *call); /* * Split-mode RPC call handling */ -extern PyTypeObject py_rx_split_callType; +extern PyTypeObject py_rx_split_infoType; extern PyObject *py_rxgen_split_client_prepare(void); -extern int py_rxgen_split_client_transmit(PyObject *split, PyObject *split_call); -extern int py_rxgen_split_client_receive(PyObject *split, PyObject *split_call); +extern int py_rxgen_split_transmit(struct rx_call *call); +extern int py_rxgen_split_receive(struct rx_call *call, bool init); -static inline void py_rxgen_split_client_set(PyObject *_split_call, struct rx_call *call) +static inline void py_rxgen_split_client_set(struct rx_call *call, + PyObject *split_callback, + PyObject *_split_info) { - struct py_rx_split_call *split_call = (struct py_rx_split_call *)_split_call; - split_call->call = call; + struct py_rx_split_info *split_info = (struct py_rx_split_info *)_split_info; + + split_info->call = call; + Py_INCREF(split_callback); + call->decoder_split_callback = split_callback; + call->decoder_split_info = _split_info; } #endif /* _PY_RXGEN_H */ diff --git a/py_rxsplit.c b/py_rxsplit.c index fb6e17f..ddfe85f 100644 --- a/py_rxsplit.c +++ b/py_rxsplit.c @@ -18,69 +18,158 @@ #include "afs_py.h" #include "rxgen.h" +#define debug(fmt, ...) do { if (0) printf(fmt, ## __VA_ARGS__); } while (0) + +static int py_rx_split_do_recv(struct rx_call *call, + struct py_rx_split_info *split_info) +{ + struct pollfd fds[1]; + + fds[0].fd = call->conn->fd; + fds[0].events = POLLIN; + fds[0].revents = 0; + poll(fds, 1, 0); + if (fds[0].revents & POLLIN) + rxrpc_recv_data(call->conn, true); + return 0; +} + +static int py_rx_split_do_send_recv(struct rx_call *call, + struct py_rx_split_info *split_info, + bool more) +{ + if (!more) { + call->more_send = 0; + split_info->split_state = split_idle; + } + if (rxrpc_send_data(call) == -1) + return -1; + + return py_rx_split_do_recv(call, split_info); +} + /* - * Send an RPC call: split.send(data, more) + * Send an RPC call: split_info.send(data, more) */ static PyObject *py_rx_split_send(PyObject *_self, PyObject *args) { - struct py_rx_split_call *self = (struct py_rx_split_call *)_self; - struct rx_call *call = self->call; - struct pollfd fds[1]; + struct py_rx_split_info *split_info = (struct py_rx_split_info *)_self; + struct rx_call *call = split_info->call; Py_buffer data; - int more; + int more = 0; if (!call) abort(); - if (self->state != split_transmitting) + if (split_info->split_state != split_transmitting) abort(); if (!PyArg_ParseTuple(args, "y*p", &data, &more)) return NULL; - if (py_enc_buffer_raw(call, &data, UINT_MAX, false) == -1) + if (py_enc_buffer_raw(call, &data, UINT_MAX) == -1) return PyErr_SetFromErrno(PyExc_OSError); - if (!more) { - call->more = 0; - self->state = split_idle; - } - if (rxrpc_send_data(call) == -1) + if (py_rx_split_do_send_recv(call, split_info, more) < 0) return PyErr_SetFromErrno(PyExc_OSError); - fds[0].fd = call->conn->fd; - fds[0].events = POLLIN; - fds[0].revents = 0; - poll(fds, 1, 0); - if (fds[0].revents & POLLIN) - rxrpc_recv_data(call->conn, true); + Py_RETURN_NONE; +} + +/* + * Request reception of some data, where the amount of data is potentially + * zero. This can only be used if there is no non-split data to be received + * after the split data as this commits the caller to handling all the + * remaining data in this phase. + * + * The caller should use split_info.data_available() to work out how much data + * is available, if any. + */ +static PyObject *py_rx_split_will_recv_all(PyObject *_self, PyObject *args) +{ + struct py_rx_split_info *split_info = (struct py_rx_split_info *)_self; + struct rx_call *call = split_info->call; + call->need_size = UINT_MAX; Py_RETURN_NONE; } /* - * Recv an RPC call: v = split.recv(n) + * Receive a fixed-size object: ret = split_info.begin_recv(buffer) + * + * The object's intrinsic size is used to determine how much data we want for + * it. Potentially 0-length, variable sized data cannot be handled this way. + * + * Returns NULL on error; True on success (ie. may be more to receive). */ -static PyObject *py_rx_split_recv(PyObject *_self, PyObject *args) +static PyObject *py_rx_split_begin_recv(PyObject *_self, PyObject *args) { - struct py_rx_split_call *self = (struct py_rx_split_call *)_self; - struct rx_call *call = self->call; + struct py_rx_split_info *split_info = (struct py_rx_split_info *)_self; + struct rx_call *call = split_info->call; + Py_buffer buffer; + int ret, padded = 1; - if (self->state != split_receiving) + if (!call) + abort(); + if (split_info->split_state != split_receiving) abort(); - abort(); - return NULL; + if (!PyArg_ParseTuple(args, "w*|p", &buffer, &padded)) + return NULL; + + ret = py_dec_init_buffer(call, &buffer, padded); + switch (ret) { + case -1: return NULL; + case 0: split_info->state++; Py_RETURN_TRUE; + case 1: split_info->receiving_data = true; Py_RETURN_TRUE; + default: abort(); + } +} + +/* + * Query how much data is in the Rx buffers of an RPC call: n = split_info.data_available() + */ +static PyObject *py_rx_split_data_available(PyObject *_self, PyObject *args) +{ + struct py_rx_split_info *split_info = (struct py_rx_split_info *)_self; + struct rx_call *call = split_info->call; + + switch (call->state) { + case rx_call_cl_decoding_response: + case rx_call_sv_decoding_opcode: + case rx_call_sv_decoding_params: + break; + default: + /* Aborted or some other error */ + return NULL; + } + + debug("DAVAIL: %u %d\n", call->data_count, call->more_recv); + + if (call->data_count > 0 || call->more_recv) + return PyLong_FromUnsignedLong(call->data_count); + else + Py_RETURN_NONE; } /* * Methods applicable to split-call objects */ -static PyMethodDef py_rx_split_methods[] = { +static PyMethodDef py_rx_split_info_methods[] = { {"send", (PyCFunction)py_rx_split_send, METH_VARARGS, "" }, - {"recv", (PyCFunction)py_rx_split_recv, METH_VARARGS, + {"will_recv_all", (PyCFunction)py_rx_split_will_recv_all, METH_NOARGS, "" }, + {"begin_recv", (PyCFunction)py_rx_split_begin_recv, METH_VARARGS, + "" }, + {"data_available", (PyCFunction)py_rx_split_data_available, METH_NOARGS, + "" }, + {} +}; + +static PyMemberDef py_rx_split_info_members[] = { + { "state", T_UINT, offsetof(struct py_rx_split_info, state), 0, ""}, + { "target", T_OBJECT, offsetof(struct py_rx_split_info, target), 0, ""}, {} }; @@ -90,24 +179,29 @@ static PyMethodDef py_rx_split_methods[] = { static int py_rx_split_init(PyObject *_self, PyObject *args, PyObject *kwds) { - struct py_rx_split_call *self = (struct py_rx_split_call *)_self; + struct py_rx_split_info *self = (struct py_rx_split_info *)_self; self->call = NULL; + self->target = NULL; + self->split_state = split_idle; + self->state = 0; + self->receiving_data = false; return 0; } static void -py_rx_split_dealloc(struct py_rx_split_call *self) +py_rx_split_dealloc(struct py_rx_split_info *self) { - assert(self->state != split_dead); - self->state = split_dead; + assert(self->split_state != split_dead); + self->split_state = split_dead; self->call = NULL; + Py_XDECREF(self->target); Py_TYPE(self)->tp_free((PyObject *)self); } -PyTypeObject py_rx_split_callType = { +PyTypeObject py_rx_split_infoType = { PyVarObject_HEAD_INIT(NULL, 0) - "kafs.rx_split_call", /*tp_name*/ - sizeof(struct py_rx_split_call), /*tp_basicsize*/ + "kafs.rx_split_info", /*tp_name*/ + sizeof(struct py_rx_split_info), /*tp_basicsize*/ 0, /*tp_itemsize*/ (destructor)py_rx_split_dealloc, /*tp_dealloc*/ 0, /*tp_print*/ @@ -132,8 +226,8 @@ PyTypeObject py_rx_split_callType = { 0, /* tp_weaklistoffset */ 0, /* tp_iter */ 0, /* tp_iternext */ - py_rx_split_methods, /* tp_methods */ - 0, /* tp_members */ + py_rx_split_info_methods, /* tp_methods */ + py_rx_split_info_members, /* tp_members */ 0, /* tp_getset */ 0, /* tp_base */ 0, /* tp_dict */ @@ -146,13 +240,13 @@ PyTypeObject py_rx_split_callType = { }; /* - * Check a split-RPC object has transmit and receive functions. + * Allocate an information object. */ PyObject *py_rxgen_split_client_prepare(void) { - struct py_rx_split_call *obj; + struct py_rx_split_info *obj; - obj = (struct py_rx_split_call *)_PyObject_New(&py_rx_split_callType); + obj = (struct py_rx_split_info *)_PyObject_New(&py_rx_split_infoType); if (!obj) return PyErr_NoMemory(); py_rx_split_init((PyObject *)obj, NULL, NULL); @@ -162,23 +256,27 @@ PyObject *py_rxgen_split_client_prepare(void) /* * Perform split-RPC transmission. */ -int py_rxgen_split_client_transmit(PyObject *split, PyObject *_split_call) +int py_rxgen_split_transmit(struct rx_call *call) { - struct py_rx_split_call *split_call = (struct py_rx_split_call *)_split_call; + struct py_rx_split_info *split_info = call->decoder_split_info; + PyObject *callback = call->decoder_split_callback; PyObject *result; int ret; - assert(split_call->state == split_idle); + assert(split_info->split_state == split_idle); - split_call->state = split_transmitting; - result = PyObject_CallMethod(split, "client_transmit", "O", _split_call); - if (result) { + split_info->split_state = split_transmitting; + result = PyObject_CallMethod(callback, "transmit", + "O", call->decoder_split_info); + if (result == Py_None) { + ret = py_rx_split_do_send_recv(call, split_info, false); + } else if (result) { ret = PyObject_IsTrue(result) ? 0 : -1; Py_DECREF(result); } else { ret = -1; } - if (ret == 0 && split_call->state != split_idle) + if (ret == 0 && split_info->split_state != split_idle) abort(); return ret; } @@ -186,24 +284,62 @@ int py_rxgen_split_client_transmit(PyObject *split, PyObject *_split_call) /* * Perform split-RPC reception. */ -int py_rxgen_split_client_receive(PyObject *split, PyObject *_split_call) +int py_rxgen_split_receive(struct rx_call *call, bool init) { - struct py_rx_split_call *split_call = (struct py_rx_split_call *)_split_call; + struct py_rx_split_info *split_info = call->decoder_split_info; + PyObject *callback = call->decoder_split_callback; PyObject *result; int ret; - assert(split_call->state == split_idle); + assert(split_info->split_state == split_idle); - split_call->state = split_receiving; - result = PyObject_CallMethod(split, "client_receive", "O", _split_call); - split_call->state = split_idle; - assert(result != NULL); + debug("-->%s(%u,%u,%u)\n", __func__, init, split_info->state, split_info->receiving_data); - if (result) { - ret = PyObject_IsTrue(result) ? 0 : -1; - Py_DECREF(result); + if (init) + split_info->state = 0; + +again: + if (split_info->receiving_data) { + switch (py_dec_into_buffer(call)) { + case -1: + result = PyObject_CallMethod(callback, "receive_failed", + "O", call->decoder_split_info); + Py_XDECREF(result); + return -1; + case 1: /* More data needed */ + return 1; + case 0: + split_info->receiving_data = false; + split_info->state++; + break; + } + } + + split_info->split_state = split_receiving; + result = PyObject_CallMethod(callback, "receive", + "O", call->decoder_split_info); + split_info->split_state = split_idle; + + if (!result) + return -1; + + if (result == Py_None) { + /* Split receive phase not used */ + call->need_size = 0; + ret = 0; + } else if (result == Py_False) { + /* Split receive phase complete */ + ret = 0; + } else if (result == Py_True) { + /* Wanting to receive data or reenter in a new state */ + if (split_info->receiving_data) + goto again; + ret = 1; } else { + PyErr_Format(PyExc_TypeError, + "Expected True, False or None return from split receive function"); ret = -1; } + Py_DECREF(result); return ret; } diff --git a/rxgen.h b/rxgen.h index 3ac09a2..eeb123d 100644 --- a/rxgen.h +++ b/rxgen.h @@ -15,6 +15,7 @@ #include "af_rxrpc.h" #include #include +#include typedef uint32_t net_xdr_t; @@ -64,12 +65,13 @@ struct rx_call { enum rx_call_state state; unsigned known_to_kernel : 1; unsigned secured : 1; - unsigned more : 1; + unsigned more_send : 1; + unsigned more_recv : 1; int error_code; uint32_t abort_code; unsigned need_size; - unsigned long long bytes_sent, bytes_received; + unsigned long long bytes_sent, bytes_received, blob_decoded; /* Service routines */ void (*processor)(struct rx_call *call); @@ -83,11 +85,15 @@ struct rx_call { struct rx_buf *buffer_tail; unsigned data_count; unsigned buffer_space; + unsigned padding_size; /* Decoding support */ unsigned phase; /* Encode/decode phase */ + unsigned blob_size; /* Size of blob being encoded/decoded */ + unsigned blob_offset; /* Offset into blob */ unsigned bulk_count; /* Number of items in bulk array */ unsigned bulk_index; /* Index of item being processed */ + void *blob; /* Blob being encoded/decoded */ union { void *bulk_item; /* Pointer to string/bytes/struct being processed */ uint32_t bulk_u32; /* 8/16/32-bit integer being processed */ @@ -96,15 +102,21 @@ struct rx_call { }; int (*decoder)(struct rx_call *call); void *decoder_private; + void *decoder_manager; + void *decoder_split_callback; + void *decoder_split_info; + void (*decoder_cleanup)(struct rx_call *call); }; +extern uint32_t rxgen_dec_padding_sink; -extern int rxrpc_enc_bytes(struct rx_call *call, const void *data, size_t size); +extern int rxrpc_enc_blob(struct rx_call *call, const void *data, size_t size); extern void rxrpc_enc_slow(struct rx_call *call, net_xdr_t data); static inline void rxrpc_enc(struct rx_call *call, uint32_t data) { uint32_t xdr_data = htonl(data); - if (__builtin_expect(call->data_cursor < call->data_stop, 1)) { + if (__builtin_expect(((unsigned long)call->data_cursor & 3) == 0 && + call->data_cursor < call->data_stop, 1)) { *(net_xdr_t *)call->data_cursor = xdr_data; call->data_cursor += 4; } else { @@ -114,6 +126,7 @@ static inline void rxrpc_enc(struct rx_call *call, uint32_t data) static inline void rxrpc_enc_align(struct rx_call *call) { + abort(); // Can't assume data_cursor is 4-byte aligned while ((unsigned long)call->data_cursor & 3) *(call->data_cursor++) = 0; } @@ -132,11 +145,12 @@ static inline int rxrpc_post_enc(struct rx_call *call) } } -extern void rxrpc_dec_bytes(struct rx_call *call); +extern void rxrpc_dec_blob(struct rx_call *call); extern uint32_t rxrpc_dec_slow(struct rx_call *call); static inline uint32_t rxrpc_dec(struct rx_call *call) { - if (__builtin_expect(call->data_cursor < call->data_stop, 1)) { + if (__builtin_expect(((unsigned long)call->data_cursor & 3) == 0 && + call->data_cursor < call->data_stop, 1)) { net_xdr_t x = *(net_xdr_t *)call->data_cursor; call->data_cursor += sizeof(x); return ntohl(x); @@ -148,6 +162,7 @@ static inline uint32_t rxrpc_dec(struct rx_call *call) static inline void rxrpc_dec_align(struct rx_call *call) { unsigned long cursor = (unsigned long)call->data_cursor; + abort(); // Can't assume data_cursor is 4-byte aligned cursor = (cursor + 3) & ~3UL; call->data_cursor = (uint8_t*)cursor; } @@ -158,6 +173,7 @@ static inline int rxrpc_post_dec(struct rx_call *call) size_t n = call->data_cursor - call->data_start; call->data_start = call->data_cursor; call->data_count -= n; + call->need_size -= n; return 0; } else { errno = call->error_code; @@ -165,6 +181,7 @@ static inline int rxrpc_post_dec(struct rx_call *call) } } +extern void rxrpc_dec_advance_buffer(struct rx_call *call); extern int rxgen_dec_discard_excess(struct rx_call *call); extern struct rx_connection *rx_new_connection(const struct sockaddr *sa, diff --git a/rxgen/emit_c_sync_funcs.pm b/rxgen/emit_c_sync_funcs.pm index bf6deac..493eeac 100644 --- a/rxgen/emit_c_sync_funcs.pm +++ b/rxgen/emit_c_sync_funcs.pm @@ -34,10 +34,7 @@ sub emit_func_prototype($) if ($p->{class} eq "array") { die $p->{where}, ": Array arg not supported"; - } elsif ($p->{class} eq "bulk" && - !($p->{elem}->{class} eq "string" || - $p->{elem}->{class} eq "opaque") - ) { + } elsif ($p->{class} eq "bulk") { # Encode if ($p->{elem}->{class} eq "struct") { $proto = "int (*get__" . $p->{name} . ")(struct rx_call *call, void *token)"; @@ -63,8 +60,7 @@ sub emit_func_prototype($) push @declines, "void *token__" . $p->{name}; push @declines, "size_t nr__" . $p->{name}; push @args, "token__" . $p->{name}; - } elsif ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" || - $p->{elem}->{class} eq "opaque")) { + } elsif ($p->{class} eq "blob") { $proto = $p->{type} . " " . $p->{ptr} . $p->{name}; push @enclines, "size_t nr__" . $p->{name}; push @enclines, "const " . $proto; @@ -191,12 +187,38 @@ sub emit_func_encode($$$$) ) { $phase->{size} += $p->{xdr_size}; push @{$phase->{params}}, $p; + } elsif ($p->{class} eq "blob") { + $have_bulk = 2; + + # Bulk objects begin with an element count + $phase->{elem_count} = $phase->{size}; + $phase->{size} += 4; + + my %pseudoparam = ( + class => "basic", + type => "blob_size", + name => $p->{name}, + elem => $p->{elem}, + where => $p->{where}, + xdr_size => $p->{xdr_size}, + ); + push @{$phase->{params}}, \%pseudoparam; + + # Create a new phase + $phase = { + type => "blob", + name => $p->{name}, + elem => $p->{elem}, + params => [ $p ], + xdr_size => $p->{xdr_size}, + size => $p->{xdr_size}, + }; + push @phases, $phase; + + $phase->{size} = 4; + $phase = 0; } elsif ($p->{class} eq "bulk") { $have_bulk = 1 if ($have_bulk == 0); - if ($p->{elem}->{class} eq "string" || - $p->{elem}->{class} eq "opaque") { - $have_bulk = 2; - } # Bulk objects begin with an element count $phase->{elem_count} = $phase->{size}; @@ -223,16 +245,11 @@ sub emit_func_encode($$$$) }; push @phases, $phase; - if ($p->{elem}->{class} eq "string" || - $p->{elem}->{class} eq "opaque") { - $phase->{size} = 4; - } else { - # We don't want to be sending one object at a time if they're - # really small. - my $n_buf = ($p->{xdr_size} < 1020) ? int(1020 / $p->{xdr_size}) : 1; - $n_buf *= $p->{xdr_size}; - $phase->{size} = $p->{xdr_size}; - } + # We don't want to be sending one object at a time if they're + # really small. + my $n_buf = ($p->{xdr_size} < 1020) ? int(1020 / $p->{xdr_size}) : 1; + $n_buf *= $p->{xdr_size}; + $phase->{size} = $p->{xdr_size}; $phase = 0; } else { die $p->{where}, "Encode array arg not supported"; @@ -248,7 +265,7 @@ sub emit_func_encode($$$$) unless (@params) { die if ($side eq "client"); - print RXOUT "\tcall->more = 0;\n"; + print RXOUT "\tcall->more_send = 0;\n"; print RXOUT "\treturn 0;\n"; print RXOUT "}\n"; return; @@ -263,7 +280,7 @@ sub emit_func_encode($$$$) print RXOUT "\tswitch (phase) {\n"; print RXOUT "\tcase 0:\n"; - print RXOUT "\t\tcall->more = 1;\n"; + print RXOUT "\t\tcall->more_send = 1;\n"; my $phase_goto_label = 0; my $phix; @@ -277,7 +294,14 @@ sub emit_func_encode($$$$) } # Determine how big bulk objects are - if ($phase->{type} eq "bulk") { + if ($phase->{type} eq "blob") { + my $p = $phase->{params}->[0]; + print RXOUT "\t\tcall->blob_size = ", $ptr, "nr__", $p->{name}, ";\n"; + print RXOUT "\t\tif (call->blob_size == 0)\n"; + print RXOUT "\t\t\tgoto phase_", $phix + 1, ";\n"; + $phase_goto_label = $phix + 1; + print RXOUT "\t\tcall->blob_offset = 0;\n"; + } elsif ($phase->{type} eq "bulk") { my $p = $phase->{params}->[0]; print RXOUT "\t\tcall->bulk_count = ", $ptr, "nr__", $p->{name}, ";\n"; print RXOUT "\t\tif (call->bulk_count == 0)\n"; @@ -292,7 +316,11 @@ sub emit_func_encode($$$$) # Marshal the data foreach my $p (@{$phase->{params}}) { - if ($p->{type} eq "bulk_size") { + if ($p->{type} eq "blob_size") { + print RXOUT "\t\trxrpc_enc(call, ", $ptr, "nr__", $p->{name}, ");\n"; + $close_phase = 0; + next; + } elsif ($p->{type} eq "bulk_size") { print RXOUT "\t\trxrpc_enc(call, ", $ptr, "nr__", $p->{name}, ");\n"; $close_phase = 0; next; @@ -337,7 +365,12 @@ sub emit_func_encode($$$$) die $p->{where}, ": Unsupported type in decode"; } - if ($p->{class} eq "bulk") { + if ($p->{class} eq "blob") { + print RXOUT "\t\tif (call->blob_offset < call->blob_size) {\n"; + print RXOUT "\t\t\tphase = ", $phix, ";\n"; + print RXOUT "\t\t\tgoto select_phase;\n"; + print RXOUT "\t\t}\n"; + } elsif ($p->{class} eq "bulk") { print RXOUT "\t\tif (call->bulk_index < call->bulk_count) {\n"; print RXOUT "\t\t\tphase = ", $phix, ";\n"; print RXOUT "\t\t\tgoto select_phase;\n"; @@ -356,7 +389,7 @@ sub emit_func_encode($$$$) print RXOUT "\tcase ", $phix, ":\n"; print RXOUT "\t\tif (rxrpc_post_enc(call) < 0)\n"; print RXOUT "\t\t\treturn -1;\n"; - print RXOUT "\t\tcall->more = 0;\n"; + print RXOUT "\t\tcall->more_send = 0;\n"; print RXOUT "\t\tbreak;\n"; print RXOUT "\t}\n"; @@ -397,6 +430,33 @@ sub emit_func_decode($$$$) ) { $phase->{size} += $p->{xdr_size}; push @{$phase->{params}}, $p; + } elsif ($p->{class} eq "blob") { + $have_bulk = 1; + + # Bulk objects begin with an element count + $phase->{elem_count} = $phase->{size}; + $phase->{size} += 4; + + my %pseudoparam = ( + class => "basic", + type => "blob_size", + name => $p->{name}, + elem => $p->{elem}, + where => $p->{where}, + xdr_size => $p->{xdr_size}, + ); + push @{$phase->{params}}, \%pseudoparam; + + # Create a new phase + $phase = { + type => "blob", + name => $p->{name}, + params => [ $p ], + size => 4, + xdr_size => $p->{xdr_size}, + }; + push @phases, $phase; + $phase = 0; } elsif ($p->{class} eq "bulk") { $have_bulk = 1; @@ -423,11 +483,6 @@ sub emit_func_decode($$$$) }; push @phases, $phase; - if ($p->{elem}->{class} eq "string" || - $p->{elem}->{class} eq "opaque") { - $phase->{bytearray} = 1; - } - # We don't want to be asking recvmsg() for one object at a time if # they're really small. my $n_buf = ($p->{xdr_size} < 1020) ? int(1020 / $p->{xdr_size}) : 1; @@ -475,16 +530,23 @@ sub emit_func_decode($$$$) } # Determine how big bulk objects are - if ($phase->{type} eq "bulk") { + if ($phase->{type} eq "blob") { + my $p = $phase->{params}->[0]; + print RXOUT "\t\tcall->blob_size = ", $ptr, "nr__", $p->{name}, ";\n"; + print RXOUT "\t\tcall->blob_offset = UINT_MAX;\n"; + print RXOUT "\t\tif (", $ptr, "alloc__", $p->{name}, "(call, &", $ptr, "token__", $p->{name}, ") < 0)\n"; + print RXOUT "\t\t\treturn -1;\n"; + print RXOUT "\t\tif (call->blob_size == 0)\n"; + print RXOUT "\t\t\tgoto phase_", $phix + 1, ";\n"; + $phase_goto_label = $phix + 1; + print RXOUT "\t\tcall->blob_offset = 0;\n"; + } elsif ($phase->{type} eq "bulk") { my $p = $phase->{params}->[0]; print RXOUT "\t\tcall->bulk_count = ", $ptr, "nr__", $p->{name}, ";\n"; print RXOUT "\t\tcall->bulk_index = UINT_MAX;\n"; if ($p->{elem}->{class} eq "basic") { print RXOUT "\t\tif (", $ptr, "store__", $p->{name}, "(call, &", $ptr, "token__", $p->{name}, ") < 0)\n"; - } elsif ($p->{elem}->{class} eq "string" || - $p->{elem}->{class} eq "opaque") { - print RXOUT "\t\tif (", $ptr, "alloc__", $p->{name}, "(call, &", $ptr, "token__", $p->{name}, ") < 0)\n"; } else { print RXOUT "\t\tif (", $ptr, "alloc__", $p->{name}, "(call, &", $ptr, "token__", $p->{name}, ") < 0)\n"; } @@ -502,8 +564,9 @@ sub emit_func_decode($$$$) print RXOUT "\tcase ", $phix, ":\n"; print RXOUT "\t\tif (count < ", $phase->{size}, ")"; - if ($phase->{type} eq "bulk" && !exists($phase->{bytearray}) && - $phase->{xdr_size} <= 512) { + if ($phase->{type} eq "bulk" && + $phase->{xdr_size} <= 512 + ) { print RXOUT " {\n"; print RXOUT "\t\t\tunsigned n = call->bulk_count - call->bulk_index;\n"; print RXOUT "\t\t\tn = MIN(n, ", int(1024 / $phase->{xdr_size}), ");\n"; @@ -518,7 +581,10 @@ sub emit_func_decode($$$$) # Unmarshal the data print RXOUT "\n"; foreach my $p (@{$phase->{params}}) { - if ($p->{type} eq "bulk_size") { + if ($p->{type} eq "blob_size") { + print RXOUT "\t\t", $ptr, "nr__", $p->{name}, " = rxrpc_dec(call);\n"; + next; + } elsif ($p->{type} eq "bulk_size") { print RXOUT "\t\t", $ptr, "nr__", $p->{name}, " = rxrpc_dec(call);\n"; next; } @@ -542,9 +608,8 @@ sub emit_func_decode($$$$) print RXOUT "\t\t\treturn -1;\n"; print RXOUT "\t\trxgen_decode_", $p->{type}, "(call, call->bulk_item);\n"; print RXOUT "\t\tcall->bulk_index++;\n"; - } elsif ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" || - $p->{elem}->{class} eq "opaque")) { - print RXOUT "\t\trxrpc_dec_bytes(call);\n"; + } elsif ($p->{class} eq "blob") { + print RXOUT "\t\trxrpc_dec_blob(call);\n"; print RXOUT "\t\trxrpc_dec_align(call);\n"; } elsif ($p->{class} eq "basic" && $p->{xdr_size} == 4) { print RXOUT "\t\t", $ptr, $p->{name}, " = rxrpc_dec(call);\n"; @@ -557,7 +622,14 @@ sub emit_func_decode($$$$) die $p->{where}, ": Unsupported type in decode"; } - if ($p->{class} eq "bulk") { + if ($p->{class} eq "blob") { + print RXOUT "\t\tif (rxrpc_post_dec(call) < 0)\n"; + print RXOUT "\t\t\treturn -1;\n"; + print RXOUT "\t\tif (call->blob_offset < call->blob_size) {\n"; + print RXOUT "\t\t\tphase = ", $phix, ";\n"; + print RXOUT "\t\t\tgoto select_phase;\n"; + print RXOUT "\t\t}\n"; + } elsif ($p->{class} eq "bulk") { print RXOUT "\t\tif (rxrpc_post_dec(call) < 0)\n"; print RXOUT "\t\t\treturn -1;\n"; print RXOUT "\t\tif (call->bulk_index < call->bulk_count) {\n"; @@ -567,7 +639,7 @@ sub emit_func_decode($$$$) } } - if ($phase->{type} ne "bulk") { + if ($phase->{type} ne "blob" || $phase->{type} ne "bulk") { print RXOUT "\t\tif (rxrpc_post_dec(call) < 0)\n"; print RXOUT "\t\t\treturn -1;\n"; } @@ -629,31 +701,35 @@ sub emit_func_send($$) print RXOUT "\tstruct rx_call *call;\n" if ($what eq "request"); - my @all_bulk_params = grep { $_->{class} eq "bulk"; } @{$params}; - - my @bulk_params = grep { ($_->{elem}->{class} ne "string" && - $_->{elem}->{class} ne "opaque"); } @all_bulk_params; + my @blob_params = grep { $_->{class} eq "blob"; } @{$params}; + my @bulk_params = grep { $_->{class} eq "bulk"; } @{$params}; # Local variables print RXOUT "\tint ret;\n"; # Check lengths - if (@all_bulk_params) { + if (@blob_params || @bulk_params) { print RXOUT "\n"; print RXOUT "\tif ("; my $first = 1; - foreach my $p (@all_bulk_params) { + foreach my $p (@blob_params) { if ($first) { $first = 0; } else { print RXOUT " ||\n\t "; } - if ($p->{elem}->{class} eq "string" || - $p->{elem}->{class} eq "opaque") { - print RXOUT "!", $p->{name}; + print RXOUT "!", $p->{name}; + if (exists($p->{dim})) { + print RXOUT " || nr__", $p->{name}, " > ", $p->{dim}; + } + } + foreach my $p (@bulk_params) { + if ($first) { + $first = 0; } else { - print RXOUT "!get__", $p->{name}; + print RXOUT " ||\n\t "; } + print RXOUT "!get__", $p->{name}; if (exists($p->{dim})) { print RXOUT " || nr__", $p->{name}, " > ", $p->{dim}; } @@ -685,10 +761,23 @@ sub emit_func_send($$) print RXOUT "\trxrpc_enc(call, (uint32_t)(", $p->{name}, " >> 32));\n"; } elsif ($p->{class} eq "struct") { print RXOUT "\trxgen_encode_", $p->{type}, "(call, ", $p->{name}, ");\n"; - } elsif ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" || - $p->{elem}->{class} eq "opaque")) { - print RXOUT "\trxrpc_enc_bytes(call, ", $p->{name}, ", nr__", $p->{name}, ");\n"; + } elsif ($p->{class} eq "blob") { + print RXOUT "\trxrpc_enc_blob(call, ", $p->{name}, ", nr__", $p->{name}, ");\n"; print RXOUT "\trxrpc_enc_align(call);\n"; + } elsif ($p->{class} eq "blob") { + print RXOUT "\trxrpc_enc(call, nr__", $p->{name}, ");\n"; + print RXOUT "\tcall->blob_size = nr__", $p->{name}, ";\n"; + print RXOUT "\tfor (call->blob_offset = 0; call->blob_offset < call->blob_size; call->blob_offset++) {\n"; + if ($p->{elem}->{class} eq "struct") { + print RXOUT "\t\tstruct ", $p->{elem}->{type}, " x;\n"; + } else { + print RXOUT "\t\t", $p->{elem}->{type}, " x;\n"; + } + print RXOUT "\t\tcall->blob = &x;\n"; + print RXOUT "\t\tif (get__", $p->{name}, "(call, token__", $p->{name}, ") < 0)\n"; + print RXOUT "\t\t\tgoto error;\n"; + die $p->{where}, "No decoding for array type '$type'"; + print RXOUT "\t}\n"; } elsif ($p->{class} eq "bulk") { print RXOUT "\trxrpc_enc(call, nr__", $p->{name}, ");\n"; print RXOUT "\tcall->bulk_count = nr__", $p->{name}, ";\n"; @@ -723,7 +812,7 @@ sub emit_func_send($$) print RXOUT "\tif (rxrpc_post_enc(call) < 0)\n"; print RXOUT "\t\tgoto error;\n"; - print RXOUT "\tcall->more = 0;\n"; + print RXOUT "\tcall->more_send = 0;\n"; # Send the message print RXOUT "\n"; diff --git a/rxgen/emit_py_module.pm b/rxgen/emit_py_module.pm index 1760eb0..6511a1e 100644 --- a/rxgen/emit_py_module.pm +++ b/rxgen/emit_py_module.pm @@ -85,7 +85,7 @@ sub emit_py_module() { if (@py_type_defs) { print PYOUT "\tif ("; print PYOUT "PyType_Ready(&py_rx_connectionType) < 0 ||\n\t "; - print PYOUT "PyType_Ready(&py_rx_split_callType) < 0"; + print PYOUT "PyType_Ready(&py_rx_split_infoType) < 0"; my $first = 0; foreach my $def (@py_type_defs) { print PYOUT " ||\n\t " unless ($first); diff --git a/rxgen/emit_py_sync_funcs.pm b/rxgen/emit_py_sync_funcs.pm index 821bf24..dd3c609 100644 --- a/rxgen/emit_py_sync_funcs.pm +++ b/rxgen/emit_py_sync_funcs.pm @@ -54,7 +54,7 @@ sub emit_py_func_param_object($$) { push @complex, $p; print PYHDR "\t\tPyObject\t*", $p->{name}, ";\n"; } - $have_opaque = 1 if ($p->{class} eq "bulk" && $p->{elem}->{class} eq "opaque"); + $have_opaque = 1 if ($p->{class} eq "blob" && $p->{elem}->{class} eq "opaque"); } print PYHDR "\t} x;\n"; print PYHDR "\tPy_buffer dec_buf;\n" if ($have_opaque); @@ -96,7 +96,8 @@ sub emit_py_func_param_object($$) { print PYOUT "static PyMemberDef ", $struct_req, "_members[] = {\n"; foreach my $p (@{$params}) { print PYOUT "\t{ \"", $p->{name}, "\", "; - if ($p->{class} eq "bulk") { print PYOUT "T_OBJECT_EX"; + if ($p->{class} eq "blob") { print PYOUT "T_OBJECT_EX"; + } elsif ($p->{class} eq "bulk") { print PYOUT "T_OBJECT_EX"; } elsif ($p->{type} eq "char" ) { print PYOUT "T_CHAR"; } elsif ($p->{type} eq "int8_t" ) { print PYOUT "T_BYTE"; } elsif ($p->{type} eq "int16_t" ) { print PYOUT "T_SHORT"; @@ -174,7 +175,6 @@ sub emit_py_func_bulk_helper($) foreach my $p (@{$func->{params}}) { next if ($p->{class} ne "bulk"); - next if ($p->{elem}->{class} eq "string" || $p->{elem}->{class} eq "opaque"); # Data encoding if (!exists $bulk_get_helpers{$p->{type}}) { @@ -255,8 +255,7 @@ sub emit_py_func_simple_sync_call($) print PYOUT "\tstruct py_rx_connection *z_conn;\n"; print PYOUT "\tstruct py_", $func->{name}, "_response *response;\n"; foreach my $p (@{$func->{request}}) { - if ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" || - $p->{elem}->{class} eq "opaque")) { + if ($p->{class} eq "blob") { print PYOUT "\tPy_buffer param_", $p->{name}, ";\n"; } elsif ($p->{class} eq "basic") { print PYOUT "\t", $p->{type}, " param_", $p->{name}, ";\n"; @@ -272,7 +271,7 @@ sub emit_py_func_simple_sync_call($) } } - print PYOUT "\tPyObject *split, *split_call;\n" if ($func->{split}); + print PYOUT "\tPyObject *split_callback, *split_info;\n" if ($func->{split}); print PYOUT "\tPyObject *res = NULL;\n"; print PYOUT "\tint ret;\n"; @@ -292,12 +291,11 @@ sub emit_py_func_simple_sync_call($) } elsif ($p->{type} eq "uint32_t") { print PYOUT "I"; } elsif ($p->{type} eq "uint64_t") { print PYOUT "K"; } elsif ($p->{class} eq "struct") { print PYOUT "O!"; - } elsif ($p->{class} eq "bulk" && $p->{elem}->{class} eq "string") { + } elsif ($p->{class} eq "blob" && $p->{elem}->{class} eq "string") { print PYOUT "s*"; - } elsif ($p->{class} eq "bulk" && $p->{elem}->{class} eq "opaque") { + } elsif ($p->{class} eq "blob" && $p->{elem}->{class} eq "opaque") { print PYOUT "z*"; - } elsif ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "basic" || - $p->{elem}->{class} eq "struct")) { + } elsif ($p->{class} eq "bulk") { print PYOUT "O!"; } else { die $p->{where}, ": No py parse for param"; @@ -311,55 +309,53 @@ sub emit_py_func_simple_sync_call($) foreach my $p (@{$func->{request}}) { print PYOUT ",\n"; print PYOUT "\t\t\t "; - if ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" || - $p->{elem}->{class} eq "opaque")) { - print PYOUT "¶m_", $p->{name}; - } elsif ($p->{class} eq "basic") { + if ($p->{class} eq "basic") { print PYOUT "¶m_", $p->{name}; } elsif ($p->{class} eq "struct") { print PYOUT "&py_", $p->{type}, "Type, ¶m_", $p->{name}; + } elsif ($p->{class} eq "blob") { + print PYOUT "¶m_", $p->{name}; } elsif ($p->{class} eq "bulk") { print PYOUT "&PyList_Type, ¶m_", $p->{name}; } else { die $p->{where}, ": Unsupported type \"", $p->{type}, "\""; } } - print PYOUT ",\n\t\t\t &split" if ($func->{split}); + print PYOUT ",\n\t\t\t &split_callback" if ($func->{split}); print PYOUT "))\n"; print PYOUT "\t\treturn NULL;\n"; if ($func->{split}) { print PYOUT "\n"; - print PYOUT "\tsplit_call = py_rxgen_split_client_prepare();\n"; - print PYOUT "\tif (!split_call)\n"; + print PYOUT "\tsplit_info = py_rxgen_split_client_prepare();\n"; + print PYOUT "\tif (!split_info)\n"; print PYOUT "\t\treturn NULL;\n"; } print PYOUT "\n"; print PYOUT "\tcall = rxrpc_alloc_call(z_conn->x, 0);\n"; print PYOUT "\tif (!call) {\n"; - print PYOUT "\t\tPy_XDECREF(split_call);\n" if ($func->{split}); + print PYOUT "\t\tPy_XDECREF(split_info);\n" if ($func->{split}); print PYOUT "\t\treturn PyErr_NoMemory();\n"; print PYOUT "\t}\n"; - print PYOUT "\tpy_rxgen_split_client_set(split_call, call);\n" if ($func->{split}); + print PYOUT "\tcall->decoder_cleanup = py_rxgen_decoder_cleanup;\n"; + print PYOUT "\tpy_rxgen_split_client_set(call, split_callback, split_info);\n" + if ($func->{split}); # Marshal the arguments print PYOUT "\n"; print PYOUT "\trxrpc_enc(call, ", $func->{opcode}, ");\n"; foreach my $p (@{$func->{request}}) { - if ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "basic" || - $p->{elem}->{class} eq "struct")) { - print PYOUT "\tif (py_encode_bulk_", $p->{type}, "(call, param_", $p->{name}, ") < 0)\n"; - print PYOUT "\t\tgoto error;\n"; - } elsif ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "string" || - $p->{elem}->{class} eq "opaque")) { + if ($p->{class} eq "blob") { my $dim = -1; $dim = $p->{dim} if exists $p->{dim}; print PYOUT "\tif (py_enc_buffer(call, ¶m_", $p->{name}, ", ", $dim, ") < 0) {\n"; print PYOUT "\t\trxrpc_terminate_call(call, EINVAL);\n"; - print PYOUT "\t\tPy_XDECREF(split_call);\n" if ($func->{split}); print PYOUT "\t\treturn NULL;\n"; print PYOUT "\t}\n"; + } elsif ($p->{class} eq "bulk") { + print PYOUT "\tif (py_encode_bulk_", $p->{type}, "(call, param_", $p->{name}, ") < 0)\n"; + print PYOUT "\t\tgoto error;\n"; } elsif ($p->{class} eq "basic" && $p->{xdr_size} == 4) { print PYOUT "\trxrpc_enc(call, param_", $p->{name}, ");\n"; } elsif ($p->{class} eq "basic" && $p->{xdr_size} == 8) { @@ -368,7 +364,6 @@ sub emit_py_func_simple_sync_call($) } elsif ($p->{class} eq "struct") { print PYOUT "\tif (py_premarshal_", $p->{type}, "((PyObject *)param_", $p->{name}, ")) {\n"; print PYOUT "\t\trxrpc_terminate_call(call, EINVAL);\n"; - print PYOUT "\t\tPy_XDECREF(split_call);\n" if ($func->{split}); print PYOUT "\t\treturn NULL;\n"; print PYOUT "\t}\n"; print PYOUT "\trxgen_encode_", $p->{type}, "(call, ¶m_", $p->{name}, "->x);\n"; @@ -393,10 +388,10 @@ sub emit_py_func_simple_sync_call($) # Transmit the split data if ($func->{split}) { - print PYOUT "\tif (py_rxgen_split_client_transmit(split, split_call) < 0)\n"; + print PYOUT "\tif (py_rxgen_split_transmit(call) < 0)\n"; print PYOUT "\t\tgoto error_no_res;\n"; } else { - print PYOUT "\tcall->more = 0;\n"; + print PYOUT "\tcall->more_send = 0;\n"; # Make the call print PYOUT "\n"; @@ -418,7 +413,6 @@ sub emit_py_func_simple_sync_call($) # Successful return print PYOUT "\n"; print PYOUT "\trxrpc_terminate_call(call, 0);\n"; - print PYOUT "\tPy_XDECREF(split_call);\n" if ($func->{split}); print PYOUT "\treturn res;\n"; # Error cleanups @@ -460,11 +454,27 @@ sub emit_py_func_decode($$$$) # in the previous phase. Variable-sized bulk arrays are split across # multiple phases, with the length being at the end of the first phase and # the data in the second. + # + # We also need to interpolate a phase to deal with decoding split-op + # auxiliary data. This comes last when decoding the request and first when + # decoding the response. + # my @phases = (); my $phase = 0; my $have_bulk = 0; my $want_item = 0; + if ($func->{split} && $subname eq "response") { + $phase = { + type => "split", + size => "py_rxgen_split_receive(call)", + params => [] + }; + push @phases, $phase; + $phase = 0; + $have_bulk = 1; + } + foreach my $p (@params) { unless ($phase) { $phase = { type => "flat", size => 0, params => [] }; @@ -476,6 +486,36 @@ sub emit_py_func_decode($$$$) ) { $phase->{size} += $p->{xdr_size}; push @{$phase->{params}}, $p; + } elsif ($p->{class} eq "blob") { + $have_bulk = 1; + + # Bulk objects begin with an element count + $phase->{elem_count} = $phase->{size}; + $phase->{size} += 4; + + my %pseudoparam = ( + class => "basic", + type => "blob_size", + name => $p->{name}, + elem => $p->{elem}, + where => $p->{where}, + xdr_size => $p->{xdr_size}, + ); + push @{$phase->{params}}, \%pseudoparam; + + # Create a new phase + $phase = { + type => "blob", + name => $p->{name}, + params => [ $p ], + xdr_size => $p->{xdr_size}, + }; + push @phases, $phase; + + # We don't want to be asking recvmsg() for one object at a time if + # they're really small. + $phase->{size} = $p->{xdr_size}; + $phase = 0; } elsif ($p->{class} eq "bulk") { $have_bulk = 1; @@ -502,12 +542,7 @@ sub emit_py_func_decode($$$$) }; push @phases, $phase; - if ($p->{elem}->{class} eq "string" || - $p->{elem}->{class} eq "opaque") { - ; - } else { - $want_item = 1; - } + $want_item = 1; # We don't want to be asking recvmsg() for one object at a time if # they're really small. @@ -518,19 +553,31 @@ sub emit_py_func_decode($$$$) } } + if ($func->{split} && $subname eq "request") { + $phase = { + type => "split", + size => "py_rxgen_split_receive(call)", + params => [] + }; + push @phases, $phase; + $phase = 0; + $have_bulk = 1; + } + # Function definition and arguments print PYOUT "\n"; print PYOUT "int py_", $func->{name}, "_decode_", $subname, "(struct rx_call *call)\n"; print PYOUT "{\n"; - unless (@params) { + unless (@params || $func->{split}) { print PYOUT "\treturn 0;\n"; print PYOUT "}\n"; return; } # Local variables - print PYOUT "\tstruct py_", $func->{name}, "_", $subname, " *obj = call->decoder_private;\n"; + print PYOUT "\tstruct py_", $func->{name}, "_", $subname, " *obj = call->decoder_private;\n" + if (@params); print PYOUT "\tPyObject *item;\n" if ($want_item); print PYOUT "\tunsigned phase = call->phase;\n"; print PYOUT "\tunsigned count;\n"; @@ -539,6 +586,7 @@ sub emit_py_func_decode($$$$) print PYOUT "\n"; print PYOUT "select_phase:\n" if ($have_bulk); print PYOUT "\tcount = call->data_count;\n"; + #print PYOUT "\tprintf(\"-- Phase %u (%u) --\\n\", phase, count);\n"; print PYOUT "\tswitch (phase) {\n"; print PYOUT "\tcase 0:\n"; @@ -555,23 +603,32 @@ sub emit_py_func_decode($$$$) } # Determine how big bulk objects are - if ($phase->{type} eq "bulk") { + if ($phase->{type} eq "blob") { my $p = $phase->{params}->[0]; - if ($p->{elem}->{class} eq "basic" || - $p->{elem}->{class} eq "struct") { - print PYOUT "\t\tobj->x.", $p->{name}, " = PyList_New(call->bulk_count);\n"; - print PYOUT "\t\tif (!obj->x.", $p->{name}, ")\n"; - print PYOUT "\t\t\treturn -1;\n"; - } elsif ($p->{elem}->{class} eq "string") { - print PYOUT "\t\tobj->x.", $p->{name}, " = PyUnicode_New(call->bulk_count, 255);\n"; - print PYOUT "\t\tif (!obj->x.", $p->{name}, ")\n"; - print PYOUT "\t\t\treturn -1;\n"; - print PYOUT "\t\tcall->bulk_item = obj->x.", $p->{name}, ";\n"; + if ($p->{elem}->{class} eq "string") { + print PYOUT "\t\tswitch (py_dec_init_string(call, &obj->x.", $p->{name}, ")) {\n"; } elsif ($p->{elem}->{class} eq "opaque") { print PYOUT "\t\tobj->x.", $p->{name}, " = PyByteArray_FromStringAndSize(\"\", 0);\n"; print PYOUT "\t\tif (!obj->x.", $p->{name}, ")\n"; print PYOUT "\t\t\treturn -1;\n"; - print PYOUT "\t\tif (PyByteArray_Resize(obj->x.", $p->{name}, ", call->bulk_count) == -1)\n"; + print PYOUT "\t\tif (PyByteArray_Resize(obj->x.", $p->{name}, ", call->blob_size) == -1)\n"; + print PYOUT "\t\t\treturn -1;\n"; + + print PYOUT "\t\tswitch (py_dec_init_opaque(call, obj->x.", $p->{name}, ")) {\n"; + } else { + die; + } + print PYOUT "\t\tcase -1: return -1;\n"; + print PYOUT "\t\tcase 0: goto phase_", $phix + 1, ";\n"; + print PYOUT "\t\tcase 1: break;\n"; + print PYOUT "\t\t}\n"; + $phase_goto_label = $phix + 1; + } elsif ($phase->{type} eq "bulk") { + my $p = $phase->{params}->[0]; + if ($p->{elem}->{class} eq "basic" || + $p->{elem}->{class} eq "struct") { + print PYOUT "\t\tobj->x.", $p->{name}, " = PyList_New(call->bulk_count);\n"; + print PYOUT "\t\tif (!obj->x.", $p->{name}, ")\n"; print PYOUT "\t\t\treturn -1;\n"; } else { die; @@ -584,24 +641,40 @@ sub emit_py_func_decode($$$$) } # Entry point for a phase - print PYOUT "\t\tcall->need_size = ", $phase->{size}, ";\n"; + if ($phase->{type} ne "split") { + print PYOUT "\t\tcall->need_size = ", $phase->{size}, ";\n"; + } else { + print PYOUT "\t\tif (py_rxgen_split_receive(call, 1) < 0)\n"; + print PYOUT "\t\t\treturn -1;\n"; + print PYOUT "\t\tif (call->need_size == 0)\n"; + print PYOUT "\t\t\tgoto phase_", $phix + 1, ";\n"; + $phase_goto_label = $phix + 1; + } print PYOUT "\t\tcall->phase = ", $phix, ";\n"; print PYOUT "\tcase ", $phix, ":\n"; - print PYOUT "\t\tif (count < ", $phase->{size}, ")\n"; - print PYOUT "\t\t\treturn 1;\n"; + if ($phase->{type} ne "split") { + print PYOUT "\t\tif (count < call->need_size)\n"; + print PYOUT "\t\t\treturn 1;\n"; + } else { + print PYOUT "\t\tif (call->need_size == UINT_MAX ? count == 0 : count < call->need_size) {\n"; + #print PYOUT "\t\t\tprintf(\"NEED %u (phase %u)\\n\", call->need_size, phase);\n"; + print PYOUT "\t\t\treturn 1;\n"; + print PYOUT "\t\t}\n"; + } # Unmarshal the data print PYOUT "\n"; foreach my $p (@{$phase->{params}}) { - if ($p->{type} eq "bulk_size") { + if ($p->{type} eq "blob_size") { + print PYOUT "\t\tcall->blob_size = rxrpc_dec(call);\n"; + next; + } elsif ($p->{type} eq "bulk_size") { print PYOUT "\t\tcall->bulk_count = rxrpc_dec(call);\n"; next; } - if ($p->{class} eq "bulk" && ($p->{elem}->{class} eq "basic" || - $p->{elem}->{class} eq "struct") - ) { + if ($p->{class} eq "bulk") { if ($p->{elem}->{class} eq "struct") { print PYOUT "\t\titem = py_decode_", $p->{type}, "(call);\n"; } elsif ($p->{elem}->{xdr_size} == 4 && $p->{type} =~ /^u/) { @@ -626,10 +699,16 @@ sub emit_py_func_decode($$$$) print PYOUT "\t\t\treturn -1;\n"; print PYOUT "\t\tcall->bulk_index++;\n"; - } elsif ($p->{class} eq "bulk" && $p->{elem}->{class} eq "string") { - print PYOUT "\t\tpy_dec_string(call);\n"; - } elsif ($p->{class} eq "bulk" && $p->{elem}->{class} eq "opaque") { - print PYOUT "\t\tpy_dec_opaque(call, obj->x.", $p->{name}, ");\n"; + } elsif ($p->{class} eq "blob") { + if ($p->{elem}->{class} eq "string") { + print PYOUT "\t\tswitch (py_dec_into_string(call)) {\n"; + } else { + print PYOUT "\t\tswitch (py_dec_into_buffer(call)) {\n"; + } + print PYOUT "\t\tcase -1: return -1;\n"; + print PYOUT "\t\tcase 0: break;\n"; + print PYOUT "\t\tcase 1: phase = ", $phix, "; goto select_phase;\n"; + print PYOUT "\t\t}\n"; } elsif ($p->{class} eq "basic" && $p->{xdr_size} == 4) { print PYOUT "\t\tobj->x.", $p->{name}, " = (", $p->{type}, ")rxrpc_dec(call);\n"; } elsif ($p->{class} eq "basic" && $p->{xdr_size} == 8) { @@ -641,7 +720,14 @@ sub emit_py_func_decode($$$$) die $p->{where}, ": Unsupported type in decode"; } - if ($p->{class} eq "bulk") { + if ($p->{class} eq "blob" && $p->{elem}->{class} eq "string") { + print PYOUT "\t\tif (rxrpc_post_dec(call) < 0)\n"; + print PYOUT "\t\t\treturn -1;\n"; + print PYOUT "\t\tif (call->blob_offset < call->blob_size) {\n"; + print PYOUT "\t\t\tphase = ", $phix, ";\n"; + print PYOUT "\t\t\tgoto select_phase;\n"; + print PYOUT "\t\t}\n"; + } elsif ($p->{class} eq "bulk") { print PYOUT "\t\tif (rxrpc_post_dec(call) < 0)\n"; print PYOUT "\t\t\treturn -1;\n"; print PYOUT "\t\tif (call->bulk_index < call->bulk_count) {\n"; @@ -651,7 +737,21 @@ sub emit_py_func_decode($$$$) } } - if ($phase->{type} ne "bulk") { + if ($phase->{type} eq "split") { + print PYOUT "\t\tswitch (py_rxgen_split_receive(call, 0)) {\n"; + print PYOUT "\t\tcase -1: return -1;\n"; + print PYOUT "\t\tcase 0: break;\n"; + print PYOUT "\t\tcase 1: phase = ", $phix, "; goto select_phase;\n"; + print PYOUT "\t\t}\n"; + #print PYOUT "\t\tif (rxrpc_post_dec(call) < 0)\n"; + #print PYOUT "\t\t\treturn -1;\n"; + #print PYOUT "\t\tif (call->need_size != 0) {\n"; + #print PYOUT "\t\t\tphase = ", $phix, ";\n"; + #print PYOUT "\t\t\tgoto select_phase;\n"; + #print PYOUT "\t\t}\n"; + } + + if ($phase->{type} ne "bulk" && $phase->{type} ne "blob") { print PYOUT "\t\tif (rxrpc_post_dec(call) < 0)\n"; print PYOUT "\t\t\treturn -1;\n"; } diff --git a/rxgen/rxgen.pl b/rxgen/rxgen.pl index 9a2e18a..9781a11 100755 --- a/rxgen/rxgen.pl +++ b/rxgen/rxgen.pl @@ -63,7 +63,7 @@ my $where = ""; # # Each type is specified by a hash of the following elements: # -# class Complexity class (basic, string, struct, array, bulk) +# class Complexity class (basic, string, struct, blob, array, bulk) # type Basic/struct type (char, {u,}int{8,16,32,64}_t, opaque, struct name) # elem Ref to element type def (if array/bulk) # multi 1 if array or bulk type, 0 otherwise @@ -119,8 +119,16 @@ sub define_typedef($$$) my %combined = %{$type}; + if (exists $flags->{class} && $flags->{class} eq "bulk") { + if ($type->{class} eq "string" || + $type->{class} eq "opaque") { + $flags->{class} = "blob"; + } + } + if (exists $flags->{class} && - ($flags->{class} eq "bulk" || + ($flags->{class} eq "blob" || + $flags->{class} eq "bulk" || $flags->{class} eq "array")) { die $where, ": Typedef'ing array/bulk as array/bulk not supported\n" if ($type->{multi}); @@ -314,7 +322,7 @@ discarded_comments: if ($line =~ /([a-zA-Z_][a-zA-Z0-9_]*) ([a-zA-Z_][a-zA-Z0-9_]*);/) { my %member = %{look_up_type($1)}; die $where, ": Don't support bulk constructs in structs\n" - if ($member{class} eq "bulk"); + if ($member{class} eq "bulk" || $member{class} eq "blob"); $member{name} = $2; $member{where} = $where; push $struct->{members}, \%member; @@ -425,7 +433,12 @@ discarded_comments: if ($bulk_dim) { die $where, ": Bulk-of-bulk parameters not supported\n" if ($param{class} eq "bulk"); - $param{class} = "bulk"; + if ($type->{class} eq "string" || + $type->{class} eq "opaque") { + $param{class} = "blob"; + } else { + $param{class} = "bulk"; + } $param{elem} = $type; $param{dim} = $bulk_dim unless $bulk_dim eq -1; } @@ -573,6 +586,11 @@ foreach $func (@funcs) { ; } elsif ($p->{class} eq "struct") { die unless (exists $p->{xdr_size}); + } elsif ($p->{class} eq "blob") { + die $p->{where}, ": No element type" unless (exists $p->{elem}); + if (exists $p->{dim}) { + die $where, ": Missing constant ", $p->{dim} unless exists $constants{$p->{dim}}; + } } elsif ($p->{class} eq "bulk") { die $p->{where}, ": No element type" unless (exists $p->{elem}); die $p->{where}, ": Element has no XDR size" unless (exists $p->{elem}->{xdr_size}); diff --git a/suite/commands/bos/getlog.py b/suite/commands/bos/getlog.py new file mode 100644 index 0000000..8abf115 --- /dev/null +++ b/suite/commands/bos/getlog.py @@ -0,0 +1,121 @@ +# +# AFS Server management toolkit: Server log fetcher +# -*- coding: utf-8 -*- +# + +__copyright__ = """ +Copyright (C) 2014 Red Hat, Inc. All Rights Reserved. +Written by David Howells (dhowells@redhat.com) + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public Licence version 2 as +published by the Free Software Foundation. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public Licence for more details. + +You should have received a copy of the GNU General Public Licence +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +""" + +from afs.exception import AFSException +from afs.argparse import * +from afs.lib.output import * +import kafs +import os + +help = "Print a server process's log file" + +command_arguments = [ + [ "server", get_bosserver, "rs", "" ], + [ "file", get_file_name, "rs", "" ], + [ "cell", get_cell, "os", "" ], + [ "noauth", get_auth, "fn" ], + [ "localauth", get_auth, "fn" ], + [ "verbose", get_verbose, "fn" ], + [ "encrypt", get_dummy, "fn" ], +] + +cant_combine_arguments = [ + ( "cell", "localauth" ), + ( "noauth", "localauth" ), +] + +argument_size_limits = { + "file" : kafs.BOZO_BSSIZE, +} + +description = r""" +Print a server process's log file +""" + +class split_handler: + def __init__(self): + pass + + def transmit(self, split_info): + return None + + # Receive state machine. Returns True if more data is required to be read + # or another state must be transited to and False if the operation is now + # complete. + # + def receive(self, split_info): + # We want to receive everything we can - the entire response phase + # belongs to the split handler + if split_info.state == 0: + split_info.will_recv_all() + split_info.state = 1 + return True + + if split_info.state == 1: + avail = split_info.data_available() + if avail == None: + # Last byte read + return False + if avail == 0: + split_info.will_recv_all() + return True + + if avail > 4096: + avail = 4096 + buf = bytearray(avail) + split_info.target = buf + split_info.state = 2 + + # Request reception start. This function will be reentered with + # phase incremented when it's done. receive_failed() will be + # called instead upon failure. + return split_info.begin_recv(buf, False) + + if split_info.state == 3: + buf = split_info.target + split_info.target = None + + # Strip any terminal NUL chars + buf = buf.rstrip(b"\0") + output_raw(buf) + del buf + + split_info.state = 1 + return True + + raise AFSException("Unexpected receive phase ", split_info.phase, + " in getlog() split.receive()") + + def receive_failed(self, split_info): + print("Receive failed in phase", split_info.phase) + split_info.target = None + +def main(params): + cell = params["cell"] + bos_conn = cell.open_bos_server(params["server"], params) + + split = split_handler() + + output("Fetching log file '", params["file"], "'...\n") + output_flush() + ret = kafs.BOZO_GetLog(bos_conn, params["file"], split) diff --git a/suite/commands/bos/install.py b/suite/commands/bos/install.py index e9ada32..0de39f0 100644 --- a/suite/commands/bos/install.py +++ b/suite/commands/bos/install.py @@ -61,7 +61,7 @@ class split_handler: self.__size = size self.__pos = 0 - def client_transmit(self, split_call): + def transmit(self, split_info): while self.__pos < self.__size: verbose("--- XMIT ", self.__pos, "/", self.__size, "\n") size = self.__size - self.__pos @@ -72,11 +72,11 @@ class split_handler: buf = self.__fd.read(size) if len(buf) < size: raise IOError("Short read on file " + self.__name) - split_call.send(buf, self.__pos < self.__size) + split_info.send(buf, self.__pos < self.__size) return True - def client_receive(self, split_call): - return True + def receive(self, split_info, inited): + return None def main(params): cell = params["cell"] diff --git a/suite/lib/output.py b/suite/lib/output.py index 17906f7..fe5b932 100644 --- a/suite/lib/output.py +++ b/suite/lib/output.py @@ -50,6 +50,13 @@ def output(*args): for i in args: sys.stdout.write(str(i)) +def output_raw(*args): + for i in args: + sys.stdout.buffer.write(i) + +def output_flush(): + sys.stdout.flush() + def outputf(formatstr, *args): sys.stdout.write(formatstr.format(*args)) diff --git a/suite/main.py b/suite/main.py index b4b936f..b4ecc35 100644 --- a/suite/main.py +++ b/suite/main.py @@ -172,7 +172,7 @@ def _main(): except KeyboardInterrupt: sys.exit(1) except SystemExit: - pass + sys.exit(0) except: print('Unhandled exception:', file=sys.stderr) traceback.print_exc() -- 2.49.0