]> www.infradead.org Git - users/dhowells/kafs-utils.git/commitdiff
Dynamically load the transport interface as a shared library
authorDavid Howells <dhowells@redhat.com>
Fri, 25 Sep 2015 22:21:50 +0000 (23:21 +0100)
committerDavid Howells <dhowells@redhat.com>
Fri, 3 Mar 2017 15:37:40 +0000 (15:37 +0000)
Make it possible to change the transport interface by abstracting it out
into a shared library and loading it dynamically by name.

kafs/main.py currently loads it by specific name:

    rpc.load_kafs_transport("/usr/local/lib64/kafs_af_rxrpc.so")

but this needs altering so that the name of the transport can be supplied
by the user.

File kafs/transport.h defines the interface as a table of function pointers
defined as:

struct kafs_transport kafs_transport = {
.id = kafs_transport_id,
.name = "<name of transport>"
...
};

The ->new_service(), ->shutdown_service() and ->accept_call() methods
aren't currently used.  Each transport interface can define the following
structures in any way it sees fit:

struct kafs_transport_handle;
struct kafs_security_handle;
struct kafs_connection_handle;
struct kafs_service_handle;
struct kafs_call_handle;

As far as kafs-utils is concerned they're opaque handles.

Note that the interface is expected to invoke the data_ready_func function
passed to ->make_call() when data becomes available.  At that point, the
data_ready_func function will call ->recv_data() to load data into the
supplied buffers.  ->poll_connection() is called to see if there's any data
and potentially to wait for it if there isn't.

The AF_RXRPC transport interface is now separated from core kafs-utils code
and is currently not built or installed by the setup.py script.  This will
need fixing in some manner.

Signed-off-by: David Howells <dhowells@redhat.com>
21 files changed:
Makefile
af_rxrpc/Makefile [new file with mode: 0644]
af_rxrpc/af_rxrpc.c [new file with mode: 0644]
af_rxrpc/af_rxrpc.h [moved from kafs/af_rxrpc.h with 100% similarity]
kafs/argparse.py
kafs/commands/pts/setfields.py
kafs/lib/cell.py
kafs/lib/parse_setrestart_time.py
kafs/main.py
kafs/py_rxconn.c
kafs/py_rxgen.c
kafs/py_rxgen.h
kafs/py_rxsplit.c
kafs/py_rxtrans.c [new file with mode: 0644]
kafs/rxgen.h
kafs/transport.c [moved from kafs/af_rxrpc.c with 57% similarity]
kafs/transport.h [new file with mode: 0644]
rxgen/emit_c_sync_funcs.py
rxgen/emit_py_module.py
rxgen/emit_py_sync_funcs.py
setup.py

index dfbd32dacc9be3c944b3accbf255d07675701a19..6674bc338ebbb0eb6b3935974613d42b3617d4c9 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -1,4 +1,4 @@
-build:
+run:
        python3 setup.py build
 
 rpm:
diff --git a/af_rxrpc/Makefile b/af_rxrpc/Makefile
new file mode 100644 (file)
index 0000000..be28d58
--- /dev/null
@@ -0,0 +1,2 @@
+kafs_af_rxrpc.so: af_rxrpc.c af_rxrpc.h Makefile ../kafs/transport.h
+       $(CC) -shared -fPIC -O2 -g -Wall -o $@ af_rxrpc.c -I../kafs
diff --git a/af_rxrpc/af_rxrpc.c b/af_rxrpc/af_rxrpc.c
new file mode 100644 (file)
index 0000000..faebfe7
--- /dev/null
@@ -0,0 +1,675 @@
+/* AF_RXRPC driver
+ *
+ * Copyright (C) 2014 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#define _XOPEN_SOURCE
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <poll.h>
+#include <errno.h>
+#include <limits.h>
+#include <sys/socket.h>
+#include "transport.h"
+#include "af_rxrpc.h"
+
+#define RXGEN_CALL_MAGIC       0x52584745U
+#define RXGEN_BUF_MAGIC                (0x52420000U | __LINE__)
+#define RXGEN_BUF_DEAD         (0x6b6bU | __LINE__)
+
+#define debug(fmt, ...) do { if (0) printf("AFRX: "fmt, ## __VA_ARGS__); } while (0)
+
+uint32_t rxgen_dec_padding_sink;
+
+struct usage {
+       int count;
+};
+
+static inline void usage_set(struct usage *usage)
+{
+       usage->count = 1;
+}
+
+static inline void usage_get(struct usage *usage)
+{
+       usage->count++;
+}
+
+static inline int usage_put_and_test(struct usage *usage)
+{
+       return --usage->count == 0;
+}
+
+struct kafs_transport_handle {
+       struct usage usage;
+       struct sockaddr_rxrpc local;
+};
+
+struct kafs_security_handle {
+       struct usage usage;
+       struct kafs_transport_handle *transport;
+       enum kafs_connection_auth_level level;
+       unsigned min_sec_level;
+       unsigned cell_name_len;
+       char cell_name[];
+};
+
+struct kafs_connection_handle {
+       struct usage usage;
+       int fd;
+       struct kafs_transport_handle *transport;
+       struct kafs_security_handle *security;
+       struct sockaddr_rxrpc peer;
+};
+
+struct kafs_call_handle {
+       struct usage usage;
+       struct kafs_connection_handle *conn;
+       kafs_data_ready_func data_ready_func;
+       void *caller_data;
+       int known_to_kernel;
+};
+
+/*
+ * dump the control messages
+ */
+static __attribute__((unused))
+void dump_cmsg(struct msghdr *msg)
+{
+       struct cmsghdr *cmsg;
+       unsigned long user_id;
+       unsigned char *p;
+       int abort_code;
+       int n;
+
+       for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
+               n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg));
+               p = CMSG_DATA(cmsg);
+
+               printf("CMSG: %zu: ", cmsg->cmsg_len);
+
+               if (cmsg->cmsg_level == SOL_RXRPC) {
+                       switch (cmsg->cmsg_type) {
+                       case RXRPC_USER_CALL_ID:
+                               printf("RXRPC_USER_CALL_ID: ");
+                               if (n != sizeof(user_id))
+                                       goto dump_data;
+                               memcpy(&user_id, p, sizeof(user_id));
+                               printf("%lx\n", user_id);
+                               continue;
+
+                       case RXRPC_ABORT:
+                               printf("RXRPC_ABORT: ");
+                               if (n != sizeof(abort_code))
+                                       goto dump_data;
+                               memcpy(&abort_code, p, sizeof(abort_code));
+                               printf("%d\n", abort_code);
+                               continue;
+
+                       case RXRPC_ACK:
+                               printf("RXRPC_ACK");
+                               if (n != 0)
+                                       goto dump_data_colon;
+                               goto print_nl;
+
+                       case RXRPC_RESPONSE:
+                               printf("RXRPC_RESPONSE");
+                               if (n != 0)
+                                       goto dump_data_colon;
+                               goto print_nl;
+
+                       case RXRPC_NET_ERROR:
+                               printf("RXRPC_NET_ERROR: ");
+                               if (n != sizeof(abort_code))
+                                       goto dump_data;
+                               memcpy(&abort_code, p, sizeof(abort_code));
+                               printf("%s\n", strerror(abort_code));
+                               continue;
+
+                       case RXRPC_BUSY:
+                               printf("RXRPC_BUSY");
+                               if (n != 0)
+                                       goto dump_data_colon;
+                               goto print_nl;
+
+                       case RXRPC_LOCAL_ERROR:
+                               printf("RXRPC_LOCAL_ERROR: ");
+                               if (n != sizeof(abort_code))
+                                       goto dump_data;
+                               memcpy(&abort_code, p, sizeof(abort_code));
+                               printf("%s\n", strerror(abort_code));
+                               continue;
+
+                       default:
+                               break;
+                       }
+               }
+
+               printf("l=%d t=%d", cmsg->cmsg_level, cmsg->cmsg_type);
+
+       dump_data_colon:
+               printf(": ");
+       dump_data:
+               printf("{");
+               for (; n > 0; n--, p++)
+                       printf("%02x", *p);
+
+       print_nl:
+               printf("}\n");
+       }
+}
+
+/*
+ * Initialise the transport module
+ */
+static struct kafs_transport_handle *af_rxrpc_init_transport(const struct sockaddr *sa,
+                                                            socklen_t salen)
+{
+       struct kafs_transport_handle *trans;
+       switch (sa->sa_family) {
+       case AF_INET:
+       case AF_INET6:
+               break;
+       default:
+               fprintf(stderr, "Unsupported endpoint type\n");
+               errno = EPROTOTYPE;
+               return NULL;
+       }
+
+       trans = calloc(1, sizeof(*trans));
+       if (!trans)
+               return NULL;
+
+       usage_set(&trans->usage);
+       trans->local.srx_family = AF_RXRPC;
+       trans->local.srx_service = 0;
+       trans->local.transport_type = SOCK_DGRAM;
+       trans->local.transport_len = salen;
+       memcpy(&trans->local.transport, sa, salen);
+
+       return trans;
+}
+
+/*
+ * Free the transport module
+ */
+static void af_rxrpc_free_transport(struct kafs_transport_handle *trans)
+{
+       if (usage_put_and_test(&trans->usage))
+               free(trans);
+}
+
+/*
+ * Set up a security descriptor
+ */
+static struct kafs_security_handle *af_rxrpc_new_security(
+       struct kafs_transport_handle *transport,
+       const char *cell_name,
+       enum kafs_connection_auth_level level)
+{
+       struct kafs_security_handle *security;
+       size_t namelen = strlen(cell_name);
+
+       security = calloc(1, sizeof(*security) + namelen + 1);
+       if (!security)
+               return NULL;
+
+       usage_set(&security->usage);
+       security->transport = transport;
+       security->cell_name_len = namelen;
+       security->level = level;
+       switch (level) {
+       case kafs_connection_no_auth:
+               break;
+       case kafs_connection_local_auth:
+               fprintf(stderr, "Local auth not supported by AF_RXRPC transport\n");
+               errno = -EINVAL;
+               free(security);
+               return NULL;
+       case kafs_connection_clear:
+               security->min_sec_level = RXRPC_SECURITY_PLAIN;
+               break;
+       case kafs_connection_integrity_only:
+               security->min_sec_level = RXRPC_SECURITY_AUTH;
+               break;
+       case kafs_connection_encrypt:
+               security->min_sec_level = RXRPC_SECURITY_ENCRYPT;
+               break;
+       }
+       memcpy(security->cell_name, cell_name, namelen + 1);
+       usage_get(&security->transport->usage);
+       return security;
+}
+
+/*
+ * Free a security descriptor
+ */
+static void af_rxrpc_free_security(struct kafs_security_handle *security)
+{
+       if (usage_put_and_test(&security->usage)) {
+               af_rxrpc_free_transport(security->transport);
+               free(security);
+       }
+}
+
+/*
+ * Set up a new connection
+ */
+static struct kafs_connection_handle *af_rxrpc_open_connection(
+       struct kafs_transport_handle *transport,
+       struct kafs_security_handle *security,
+       const struct sockaddr *sa,
+       socklen_t salen,
+       uint16_t service,
+       int exclusive_connection)
+{
+       struct kafs_connection_handle *conn;
+       int ret;
+
+       if (salen > sizeof(conn->peer.transport)) {
+               errno = EINVAL;
+               return NULL;
+       }
+       
+       conn = calloc(1, sizeof(*conn));
+       if (!conn)
+               return NULL;
+       usage_set(&conn->usage);
+       conn->transport = transport;
+       conn->security = security;
+
+       conn->peer.srx_family = AF_RXRPC;
+       conn->peer.srx_service = service;
+       conn->peer.transport_type = SOCK_DGRAM;
+       conn->peer.transport_len = salen;
+       memcpy(&conn->peer.transport, sa, salen);
+       
+       /* Open up a socket for talking to the AF_RXRPC module */
+       conn->fd = socket(AF_RXRPC, SOCK_DGRAM, PF_INET);
+       if (conn->fd < 0)
+                       goto error_conn;
+
+       if (exclusive_connection) {
+               ret = setsockopt(conn->fd,
+                                SOL_RXRPC, RXRPC_EXCLUSIVE_CONNECTION,
+                                NULL, 0);
+               if (ret == -1)
+                       goto error_conn;
+       }
+
+       if (security) {
+               ret = setsockopt(conn->fd, SOL_RXRPC, RXRPC_MIN_SECURITY_LEVEL,
+                                &security->min_sec_level,
+                                sizeof(security->min_sec_level));
+               if (ret == -1)
+                       goto error_conn;
+
+               ret = setsockopt(conn->fd, SOL_RXRPC, RXRPC_SECURITY_KEY,
+                                security->cell_name, security->cell_name_len);
+               if (ret == -1)
+                       goto error_conn;
+       }
+
+       /* Bind an address to the local endpoint */
+       ret = bind(conn->fd, (struct sockaddr *)&conn->transport->local,
+                  sizeof(conn->transport->local));
+       if (ret < 0)
+               goto error_fd;
+
+       usage_get(&security->usage);
+       usage_get(&transport->usage);
+       return conn;
+
+error_fd:
+       close(conn->fd);
+error_conn:
+       free(conn);
+       return NULL;
+}
+
+/*
+ * Close an RxRPC client connection.  This will cause all outstanding
+ * operations to be aborted by the kernel..
+ */
+static void af_rxrpc_close_connection(struct kafs_connection_handle *conn)
+{
+       if (usage_put_and_test(&conn->usage)) {
+               close(conn->fd);
+               af_rxrpc_free_security(conn->security);
+               af_rxrpc_free_transport(conn->transport);
+               free(conn);
+       }
+}
+
+/*
+ * Allocate a call handle for a new call.
+ */
+static struct kafs_call_handle *af_rxrpc_make_call(
+       struct kafs_connection_handle *conn,
+       kafs_data_ready_func data_ready_func,
+       void *caller_data)
+{
+       struct kafs_call_handle *call;
+
+       call = calloc(1, sizeof(*call));
+       if (!call)
+               return NULL;
+
+       usage_set(&call->usage);
+       call->conn = conn;
+       call->data_ready_func = data_ready_func;
+       call->caller_data = caller_data;
+       usage_get(&conn->usage);
+       return call;
+}
+
+/*
+ * Send buffered data.
+ */
+static int af_rxrpc_send_data(struct kafs_call_handle *call,
+                             struct iovec *iov,
+                             int ioc,
+                             int more)
+{
+       struct msghdr msg;
+       size_t ctrllen;
+       unsigned char control[128];
+       int ret, i;
+
+       /* Request an operation */
+       ctrllen = 0;
+       RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)call);
+
+       msg.msg_name            = &call->conn->peer;
+       msg.msg_namelen         = sizeof(call->conn->peer);
+       msg.msg_iov             = iov;
+       msg.msg_iovlen          = ioc;
+       msg.msg_control         = control;
+       msg.msg_controllen      = ctrllen;
+       msg.msg_flags           = 0;
+
+       if (more)
+               more = MSG_MORE;
+
+       //dump_cmsg(&msg);
+       for (i = 0; i < msg.msg_iovlen; i++)
+               debug("IOV[%02u] %04zu %p\n",
+                     i, msg.msg_iov[i].iov_len, msg.msg_iov[i].iov_base);
+
+       /* Send the data */
+       ret = sendmsg(call->conn->fd, &msg, more);
+       debug("SENDMSG: %d%s\n", ret, more ? " [more]" : "");
+       if (ret >= 0)
+               call->known_to_kernel = 1;
+       return ret;
+}
+
+/*
+ * Receive data from a connection and pass it to the appropriate call.
+ */
+static int af_rxrpc_recv_mux(struct kafs_connection_handle *conn)
+{
+       struct kafs_call_handle *call;
+       struct sockaddr_rxrpc srx;
+       struct cmsghdr *cmsg;
+       struct msghdr msg;
+       struct iovec iov[1];
+       unsigned char control[128];
+       uint32_t tmpbuf[1];
+       int ret;
+
+       for (;;) {
+               /* Peek at the next message */
+               iov[0].iov_base = &tmpbuf;
+               iov[0].iov_len = sizeof(tmpbuf);
+
+               memset(&msg, 0, sizeof(msg));
+               msg.msg_iov     = iov;
+               msg.msg_iovlen  = 1;
+               msg.msg_name    = &srx;
+               msg.msg_namelen = sizeof(srx);
+               msg.msg_control = control;
+               msg.msg_controllen = sizeof(control);
+               msg.msg_flags   = 0;
+
+               ret = recvmsg(conn->fd, &msg, MSG_PEEK | MSG_DONTWAIT);
+               debug("RECVMSG PEEK: %d\n", ret);
+               if (ret == -1) {
+                       if (errno == EAGAIN ||
+                           errno == ENODATA)
+                               return 0;
+                       return -1;
+               }
+               //dump_cmsg(&msg);
+
+               /* Find the call ID. */
+               call = NULL;
+               for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+                       unsigned char *p;
+
+                       if (cmsg->cmsg_level != SOL_RXRPC ||
+                           cmsg->cmsg_type != RXRPC_USER_CALL_ID)
+                               continue;
+
+                       p = CMSG_DATA(cmsg);
+                       call = *(struct kafs_call_handle **)p;
+                       goto found_call;
+               }
+               abort();
+
+       found_call:
+               if (!call->known_to_kernel) {
+                       fprintf(stderr, "Unexpected packet for dead call\n");
+                       abort();
+               }
+               call->data_ready_func(call, call->caller_data);
+       }
+}
+
+/*
+ * Receive data from a transport.
+ */
+static int af_rxrpc_recv_data(struct kafs_call_handle *call,
+                             struct iovec *iov,
+                             int ioc,
+                             enum kafs_recv_condition *_cond,
+                             uint32_t *_supplementary_info)
+{
+       struct sockaddr_rxrpc srx;
+       struct cmsghdr *cmsg;
+       struct msghdr msg;
+       unsigned char control[128];
+       enum kafs_recv_condition cond;
+       uint32_t supplementary_info;
+       int ret;
+
+       memset(&msg, 0, sizeof(msg));
+       msg.msg_iov     = iov;
+       msg.msg_iovlen  = ioc;
+       msg.msg_name    = &srx;
+       msg.msg_namelen = sizeof(srx);
+       msg.msg_control = control;
+       msg.msg_controllen = sizeof(control);
+       msg.msg_flags   = 0;
+
+       ret = recvmsg(call->conn->fd, &msg, MSG_DONTWAIT);
+       debug("RECVMSG: %d\n", ret);
+       if (ret == -1)
+               return -1;
+
+       debug("RECV: %d [fl:%x]\n", ret, msg.msg_flags);
+       debug("CMSG: %zu\n", msg.msg_controllen);
+       debug("IOV: %zu [0]=%zu\n", msg.msg_iovlen, iov[0].iov_len);
+
+       /* Process the metadata */
+       supplementary_info = 0;
+       if (msg.msg_flags & MSG_MORE)
+               cond = kafs_recv_more;
+       else if (msg.msg_flags & MSG_EOR)
+               cond = kafs_recv_complete;
+       else
+               cond = kafs_recv_complete_now_send;
+
+       if (msg.msg_flags & MSG_EOR) {
+               debug("Detected EOR\n");
+               call->known_to_kernel = 0;
+       }
+       
+       for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+               unsigned char *p;
+               int n;
+
+               if (cmsg->cmsg_level != SOL_RXRPC)
+                       continue;
+
+               n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg));
+               p = CMSG_DATA(cmsg);
+
+               switch (cmsg->cmsg_type) {
+               case RXRPC_USER_CALL_ID:
+                       if (*(struct kafs_call_handle **)p != call) {
+                               fprintf(stderr, "Message queue changed from MSG_PEEK\n");
+                               abort();
+                       }
+                       break;
+
+               case RXRPC_ABORT:
+                       if (n == 4)
+                               memcpy(&supplementary_info, p, 4);
+                       errno = ECONNABORTED;
+                       cond = kafs_recv_abort;
+                       break;
+
+               case RXRPC_NET_ERROR:
+                       if (n != sizeof(ret)) {
+                               errno = EBADMSG;
+                               return -1;
+                       }
+                       memcpy(&supplementary_info, p, 4);
+                       cond = kafs_recv_net_error;
+                       break;
+
+               case RXRPC_LOCAL_ERROR:
+                       if (n != sizeof(ret)) {
+                               errno = EBADMSG;
+                               return -1;
+                       }
+                       memcpy(&supplementary_info, p, 4);
+                       cond = kafs_recv_local_error;
+                       break;
+
+               case RXRPC_BUSY:
+                       cond = kafs_recv_busy;
+                       break;
+
+               case RXRPC_ACK:
+                       cond = kafs_recv_final_ack;
+                       break;
+
+               case RXRPC_RESPONSE:
+                       break;
+
+               default:
+                       fprintf(stderr, "Unexpected CMSG type %x\n", cmsg->cmsg_type);
+                       break;
+               }
+       }
+
+       *_cond = cond;
+       *_supplementary_info = supplementary_info;
+       debug("<--af_rxrpc_recv_data() = %d [c=%u s=%d]\n",
+             ret, cond, supplementary_info);
+       return ret;
+}
+
+/*
+ * Abort a call.
+ */
+static void af_rxrpc_abort_call(struct kafs_call_handle *call, uint32_t abort_code)
+{
+       struct msghdr msg;
+       size_t ctrllen;
+       unsigned char control[128];
+
+       memset(control, 0, sizeof(control));
+       ctrllen = 0;
+       RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)call);
+       RXRPC_ADD_ABORT(control, ctrllen, abort_code);
+
+       msg.msg_name            = &call->conn->peer;
+       msg.msg_namelen         = sizeof(call->conn->peer);
+       msg.msg_iov             = NULL;
+       msg.msg_iovlen          = 0;
+       msg.msg_control         = control;
+       msg.msg_controllen      = ctrllen;
+       msg.msg_flags           = 0;
+
+       sendmsg(call->conn->fd, &msg, 0);
+}
+
+/*
+ * Terminate a call.
+ */
+static void af_rxrpc_terminate_call(struct kafs_call_handle *call)
+{
+       if (usage_put_and_test(&call->usage)) {
+               af_rxrpc_close_connection(call->conn);
+               free(call);
+       }
+}
+
+/*
+ * Poll a connection to see if it has data available.
+ */
+static int af_rxrpc_poll_connection(struct kafs_connection_handle *conn, int nowait)
+{
+       struct pollfd fds[1];
+       int ret;
+
+       debug("-->poll()\n");
+       
+       fds[0].fd = conn->fd;
+       fds[0].events = POLLIN;
+       fds[0].revents = 0;
+
+       ret = poll(fds, 1, nowait ? 0 : -1);
+
+       if (ret == -1) {
+               fprintf(stderr, "Poll failed: %m\n");
+               return -1;
+       }
+       if (ret > 0 && af_rxrpc_recv_mux(conn)) {
+               fprintf(stderr, "rxrpc_recv_mux failed: %m\n");
+               return -1;
+       }
+
+       return 0;
+}
+
+struct kafs_transport kafs_transport = {
+       .id                     = kafs_transport_id,
+       .name                   = "AF_RXRPC",
+       .init_transport         = af_rxrpc_init_transport,
+       .free_transport         = af_rxrpc_free_transport,
+       .new_security           = af_rxrpc_new_security,
+       .free_security          = af_rxrpc_free_security,
+       .open_connection        = af_rxrpc_open_connection,
+       .close_connection       = af_rxrpc_close_connection,
+       .make_call              = af_rxrpc_make_call,
+       .new_service            = NULL,
+       .shutdown_service       = NULL,
+       .accept_call            = NULL,
+       .abort_call             = af_rxrpc_abort_call,
+       .terminate_call         = af_rxrpc_terminate_call,
+       .send_data              = af_rxrpc_send_data,
+       .recv_data              = af_rxrpc_recv_data,
+       .poll_connection        = af_rxrpc_poll_connection,
+};
similarity index 100%
rename from kafs/af_rxrpc.h
rename to af_rxrpc/af_rxrpc.h
index be6f350ddf0f402c977e3a29f1e8e359aaf248e5..66f44d1ec745f04c6b84005424b38a7a0c90a1bc 100644 (file)
@@ -16,74 +16,74 @@ as published by the Free Software Foundation; either version
 from kafs.exception import AFSArgumentError, AFSHelpFlag
 from kafs.lib.output import set_verbosity
 
-def get_cell(switch, params):
+def get_cell(switch, params, transport):
     from kafs.lib.cell import cell
-    return cell(params[0])
+    return cell(transport, params[0])
 
-def get_bosserver(switch, params):
+def get_bosserver(switch, params, transport):
     from kafs.lib.bosserver import bosserver
     return bosserver(params[0])
 
-def get_fileserver(switch, params):
+def get_fileserver(switch, params, transport):
     from kafs.lib.fileserver import fileserver
     return fileserver(params[0])
 
-def get_volserver(switch, params):
+def get_volserver(switch, params, transport):
     from kafs.lib.volserver import volserver
     return volserver(params[0])
 
-def get_vlservers(switch, params):
+def get_vlservers(switch, params, transport):
     from kafs.lib.vlserver import vlserver
     servers = []
     for i in params:
         servers.append(vlserver(params[0]))
     return servers
 
-def get_volume_name(switch, params):
+def get_volume_name(switch, params, transport):
     return params[0]
 
-def get_volume_names(switch, params):
+def get_volume_names(switch, params, transport):
     return params
 
-def get_machine_name(switch, params):
+def get_machine_name(switch, params, transport):
     return params[0]
 
-def get_machine_names(switch, params):
+def get_machine_names(switch, params, transport):
     return params
 
-def get_path_name(switch, params):
+def get_path_name(switch, params, transport):
     return params[0]
 
-def get_path_names(switch, params):
+def get_path_names(switch, params, transport):
     return params
 
-def get_file_name(switch, params):
+def get_file_name(switch, params, transport):
     return params[0]
 
-def get_file_names(switch, params):
+def get_file_names(switch, params, transport):
     return params
 
-def get_partition_id(switch, params):
+def get_partition_id(switch, params, transport):
     from kafs.lib.partition import part2id
     return part2id(params[0])
 
-def get_auth(switch, params):
+def get_auth(switch, params, transport):
     return params
 
-def get_uuid(switch, params):
+def get_uuid(switch, params, transport):
     from kafs.lib.uuid import str2uuid
     return str2uuid(params[0])
 
-def get_string(switch, params):
+def get_string(switch, params, transport):
     return params[0]
 
-def get_strings(switch, params):
+def get_strings(switch, params, transport):
     return params
 
-def get_dummy(switch, params):
+def get_dummy(switch, params, transport):
     return params
 
-def get_verbose(switch, params):
+def get_verbose(switch, params, transport):
     set_verbosity()
     return params
 
@@ -94,10 +94,10 @@ def do_get_id(i):
         raise AFSArgumentError("UID/GID identifier is not numeric")
     return int(i)
 
-def get_id(switch, params):
+def get_id(switch, params, transport):
     return do_get_id(params[0])
 
-def get_ids(switch, params):
+def get_ids(switch, params, transport):
     ids = []
     for i in params:
         ids.append(do_get_id(i))
@@ -110,10 +110,10 @@ def do_get_uid(i):
         raise AFSArgumentError("UID identifier is a positive integer")
     return int(i)
 
-def get_uid(switch, params):
+def get_uid(switch, params, transport):
     return do_get_uid(params[0])
 
-def get_uids(switch, params):
+def get_uids(switch, params, transport):
     ids = []
     for i in params:
         ids.append(do_get_uid(i))
@@ -126,10 +126,10 @@ def do_get_gid(i):
         raise AFSArgumentError("GID identifier is a positive integer")
     return int(i)
 
-def get_gid(switch, params):
+def get_gid(switch, params, transport):
     return do_get_gid(params[0])
 
-def get_gids(switch, params):
+def get_gids(switch, params, transport):
     ids = []
     for i in params:
         ids.append(do_get_gid(i))
@@ -185,7 +185,7 @@ def get_gids(switch, params):
 #
 ###############################################################################
 def parse_arguments(args, available_arguments, argument_size_limits,
-                    cant_combine_arguments):
+                    cant_combine_arguments, transport):
     result = {}
     need_switch = False
     i = 0       # Given arguments index
@@ -285,7 +285,7 @@ def parse_arguments(args, available_arguments, argument_size_limits,
         # Call the syntax checker
         syntax = match[1]
         result["raw." + match[0]] = params
-        result[match[0]] = syntax(match[0], params)
+        result[match[0]] = syntax(match[0], params, transport)
 
     # Check for missing required arguments
     for j in available_arguments:
index b1757ade7528e1b1c3b83734328f4bf23e1ec2b3..3215c591f3e39f2490c7ffb674f920fab58ed768 100644 (file)
@@ -29,7 +29,7 @@ import sys
 
 help = "Set privacy flags or quota for a Protection Database entry"
 
-def get_privacy_flags(switch, params):
+def get_privacy_flags(switch, params, transport):
     s = params[0]
     if len(s) != 5:
         raise AFSArgumentError("Privacy flag string must be exactly 5 characters")
index add842a274affdea31e00fb4db8385f4297cd7a9..b3fa9719b1b67e8b7d4ead32a27d9d49e3c4d5d1 100644 (file)
@@ -36,7 +36,7 @@ class CellError(AFSException):
 class cell:
     """Represents an AFS cell.  We hold the cell name here and the list of
     VL servers once we've looked it up"""
-    def __init__(self, name = None):
+    def __init__(self, transport, name = None):
         if name == None:
             name = linecache.getline("/etc/afs/ThisCell", 1)
             name = name.strip()
@@ -48,6 +48,7 @@ class cell:
 
         verbose("New Cell ", name, "\n")
         self.__name = name
+        self.__transport = transport
         self.__looked_up = False
         self.__vlserver_names = dict()
         self.__vlservers = []
@@ -178,7 +179,8 @@ class cell:
         for vladdr in self.query_vl_addrs():
             verbose("Trying vlserver ", vladdr, "\n")
 
-            z_conn = rpc.rx_new_connection(str(vladdr), rpc.VL_PORT, rpc.VL_SERVICE,
+            z_conn = rpc.rx_new_connection(self.__transport,
+                                           str(vladdr), rpc.VL_PORT, rpc.VL_SERVICE,
                                            key, security)
             try:
                 ret = rpc.VL_Probe(z_conn)
@@ -197,7 +199,8 @@ class cell:
         key, security = self.determine_security(params)
 
         verbose("Trying volserver ", server.addr(), "\n")
-        vol_conn = rpc.rx_new_connection(str(server.addr()),
+        vol_conn = rpc.rx_new_connection(self.__transport,
+                                         str(server.addr()),
                                          rpc.VOLSERVICE_PORT,
                                          rpc.VOLSERVICE_ID,
                                          key, security)
@@ -208,7 +211,8 @@ class cell:
         key, security = self.determine_security(params)
 
         verbose("Trying bosserver ", server.addr(), "\n")
-        bos_conn = rpc.rx_new_connection(str(server.addr()),
+        bos_conn = rpc.rx_new_connection(self.__transport,
+                                         str(server.addr()),
                                          rpc.BOSSERVICE_PORT,
                                          rpc.BOSSERVICE_ID,
                                          key, security)
@@ -228,7 +232,8 @@ class cell:
 
         verbose("Trying ptserver ", server, "\n")
 
-        pt_conn = rpc.rx_new_connection(str(server),
+        pt_conn = rpc.rx_new_connection(self.__transport,
+                                        str(server),
                                         rpc.PR_PORT,
                                         rpc.PR_SERVICE,
                                         key, security)
index b3282cdc438832d96027b35fe222908af26877e6..8db11317211396de87cb80c9bd8e29912d6a1ca5 100644 (file)
@@ -31,7 +31,7 @@ days = [ "sunday", "monday", "tuesday", "wednesday", "thursday", "friday", "satu
 # Parse the bos setrestart -time argument
 #
 ###############################################################################
-def parse_restart_time(switch, params):
+def parse_restart_time(switch, params, transport):
     at = rpc.bozo_netKTime()
 
     time = params[0].strip().lower()
index a4903264a34d346706a41d7157b369106131b67a..8e2b641b6bc24c20717db1ea90619700f0ce7da4 100644 (file)
@@ -1,5 +1,5 @@
 #
-# AFS Toolset command switcher 
+# AFS Toolset command switcher
 # -*- coding: utf-8 -*-
 #
 
@@ -30,6 +30,7 @@ import sys, os, traceback
 import kafs.commands
 from kafs.lib.output import *
 from kafs.exception import AFSArgumentError, AFSHelpFlag
+import kafs.rpc as rpc
 
 ###############################################################################
 #
@@ -124,12 +125,19 @@ def _main():
     else:
         argument_size_limits = {}
 
+    # Load the transport module
+    rpc.load_kafs_transport("/usr/local/lib64/kafs_af_rxrpc.so")
+
+    # Set up a transport
+    transport = rpc.rx_new_transport()
+
     # Parse the parameters
     try:
         params = kafs.argparse.parse_arguments(sys.argv[1:],
-                                              command.command_arguments,
-                                              argument_size_limits,
-                                              cant_combine_arguments)
+                                               command.command_arguments,
+                                               argument_size_limits,
+                                               cant_combine_arguments,
+                                               transport)
     except AFSArgumentError as e:
         print(prog + ":", e, file=sys.stderr)
         sys.exit(2)
@@ -141,7 +149,7 @@ def _main():
     # Stick in the default cell if there isn't one
     if "cell" not in params:
         from kafs.lib.cell import cell
-        params["cell"] = cell()
+        params["cell"] = cell(transport)
 
     params["_prog"] = prog
     params["_cmdsetmod"] = cmdsetmod
index 604eb18674ae2735f206bfc527d0982ae285305c..0a4231196174ac44b77b556c8473e25c0cf07dff 100644 (file)
@@ -105,7 +105,8 @@ PyObject *
 kafs_py_rx_new_connection(PyObject *_self, PyObject *args)
 {
        struct py_rx_connection *obj;
-       struct rx_connection *z_conn;
+       struct py_rx_transport *trans;
+       struct rx_connection *conn;
        union {
                struct sockaddr sa;
                struct sockaddr_in sin;
@@ -116,7 +117,8 @@ kafs_py_rx_new_connection(PyObject *_self, PyObject *args)
        uint16_t port, service, local_port = 0, local_service = 0;
        int exclusive = 0, security = 0;
 
-       if (!PyArg_ParseTuple(args, "sHHzi|HHp",
+       if (!PyArg_ParseTuple(args, "O!sHHzi|HHp",
+                             &py_rx_transportType, &trans,
                              &address, &port, &service, &key, &security,
                              &local_port, &local_service, &exclusive))
                return NULL;
@@ -141,14 +143,13 @@ kafs_py_rx_new_connection(PyObject *_self, PyObject *args)
        py_rx_connection_init((PyObject *)obj, NULL, NULL);
        assert(obj->x == NULL);
 
-       z_conn = rx_new_connection(&sa.sa, salen, service,
-                                  local_port, local_service, exclusive,
-                                  key, security);
-       if (!z_conn) {
+       conn = rx_new_connection(trans->x, &sa.sa, salen, service,
+                                  exclusive, key, security);
+       if (!conn) {
                Py_DECREF(obj);
                return errno == ENOMEM ? PyExc_MemoryError :
                        PyErr_SetFromErrno(PyExc_IOError);
        }
-       obj->x = z_conn;
+       obj->x = conn;
        return (PyObject *)obj;
 }
index 27ed55338ebfcc70176a2b0b226f8b254cf7f883..820f6cd0288bb031d317267f0989ca14134d60f3 100644 (file)
@@ -1152,11 +1152,14 @@ int py_dec_into_string(struct rx_call *call)
 
        needed = call->blob_size - call->blob_offset;
        segment = call->data_stop - call->data_cursor;
-       debug("DEC STR dc=%u bsize=%u seg=%u\n", call->data_count, needed, segment);
+       debug("DEC STR dc=%u bsize=%u-%u seg=%lu-%lu\n",
+             call->data_count, call->blob_offset, call->blob_size,
+             call->data_cursor - call->data_start, call->data_stop - call->data_start);
 
        if (segment > 0) {
                if (segment > needed)
                        segment = needed;
+               debug("DEC STR - seg %u\n", segment);
                if (call->blob != &rxgen_dec_padding_sink) {
                        for (i = 0; i < segment; i++)
                                PyUnicode_WRITE(PyUnicode_KIND(str), PyUnicode_DATA(str),
@@ -1165,9 +1168,14 @@ int py_dec_into_string(struct rx_call *call)
                call->blob_decoded += segment;
                call->blob_offset += segment;
                call->data_cursor += segment;
-               if (call->blob_size <= call->blob_offset) {
+               if (call->blob_offset >= call->blob_size) {
+                       if (call->blob_offset > call->blob_size) {
+                               debug("DEC STR buffer overrun\n");
+                               abort();
+                       }
                        if (call->blob != &rxgen_dec_padding_sink && call->padding_size > 0) {
                                /* Soak up the padding to a 32-bit boundary */
+                               debug("DEC STR padding\n");
                                call->blob = &rxgen_dec_padding_sink;
                                call->blob_size = call->padding_size;
                                call->blob_offset = 0;
@@ -1188,8 +1196,6 @@ int py_dec_init_string(struct rx_call *call, PyObject **_str)
 {
        PyObject *str;
 
-       debug("INIT STRING %u\n", call->blob_size);
-
        str = PyUnicode_New(call->blob_size, 255);
        if (!str)
                return -1;
@@ -1205,6 +1211,7 @@ int py_dec_init_string(struct rx_call *call, PyObject **_str)
        call->blob = str;
        call->blob_offset = 0;
        call->padding_size = calc_pad_to_4(call->blob_size);
+       debug("INIT STRING %u,%u\n", call->blob_size, call->padding_size);
        return 1;
 }
 
index 25ccb61bce83b500b65262a24ae4db20a14acbb0..700568d112276d4dd9376051df15823b6ff64416 100644 (file)
 
 #include "rxgen.h"
 
+struct py_rx_transport {
+       PyObject_HEAD
+       struct kafs_transport_handle *x;
+};
+
 struct py_rx_connection {
        PyObject_HEAD
        struct rx_connection *x;
@@ -50,8 +55,11 @@ struct py_rx_split_info {
        bool receiving_data;
 };
 
+extern PyTypeObject py_rx_transportType;
 extern PyTypeObject py_rx_connectionType;
 
+extern PyObject *kafs_py_load_kafs_transport(PyObject *, PyObject *);
+extern PyObject *kafs_py_rx_new_transport(PyObject *, PyObject *);
 extern PyObject *kafs_py_rx_new_connection(PyObject *, PyObject *);
 extern PyObject *kafs_py_string_to_key(PyObject *, PyObject *);
 
index ef225e11d4badb939f07f16d7fe196cc1053a427..a659e795ba078c738daf7c0a0d341d40c1896ec7 100644 (file)
@@ -11,7 +11,6 @@
 
 #include <Python.h>
 #include "structmember.h"
-#include <poll.h>
 #include <arpa/inet.h>
 #include <assert.h>
 #include "py_rxgen.h"
 static int py_rx_split_do_recv(struct rx_call *call,
                               struct py_rx_split_info *split_info)
 {
-       struct pollfd fds[1];
-
-       fds[0].fd = call->conn->fd;
-       fds[0].events = POLLIN;
-       fds[0].revents = 0;
-       poll(fds, 1, 0);
-       if (fds[0].revents & POLLIN)
-               rxrpc_recv_data(call->conn, true);
+       rxrpc_maybe_recv_data(call->conn);
        return 0;
 }
 
diff --git a/kafs/py_rxtrans.c b/kafs/py_rxtrans.c
new file mode 100644 (file)
index 0000000..69b2807
--- /dev/null
@@ -0,0 +1,154 @@
+/* Python RxRPC transport container object
+ *
+ * Copyright (C) 2015 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#include <Python.h>
+#include "structmember.h"
+#include <arpa/inet.h>
+#include "py_rxgen.h"
+#include "rxgen.h"
+
+/*
+ * RxRPC transport container.
+ */
+static int
+py_rx_transport_init(PyObject *_self, PyObject *args, PyObject *kwds)
+{
+       struct py_rx_transport *self = (struct py_rx_transport *)_self;
+       self->x = NULL;
+       return 0;
+}
+
+static void
+py_rx_transport_dealloc(struct py_rx_transport *self)
+{
+       if (self->x) {
+               rx_free_transport(self->x);
+               self->x = NULL;
+       }
+       Py_TYPE(self)->tp_free((PyObject *)self);
+}
+
+PyTypeObject py_rx_transportType = {
+       PyVarObject_HEAD_INIT(NULL, 0)
+       "kafs.rx_transport",            /*tp_name*/
+       sizeof(struct py_rx_transport), /*tp_basicsize*/
+       0,                              /*tp_itemsize*/
+       (destructor)py_rx_transport_dealloc, /*tp_dealloc*/
+       0,                              /*tp_print*/
+       0,                              /*tp_getattr*/
+       0,                              /*tp_setattr*/
+       0,                              /*tp_compare*/
+       0,                              /*tp_repr*/
+       0,                              /*tp_as_number*/
+       0,                              /*tp_as_sequence*/
+       0,                              /*tp_as_mapping*/
+       0,                              /*tp_hash */
+       0,                              /*tp_call*/
+       0,                              /*tp_str*/
+       0,                              /*tp_getattro*/
+       0,                              /*tp_setattro*/
+       0,                              /*tp_as_buffer*/
+       Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /*tp_flags*/
+       "RxRPC transport container",    /* tp_doc */
+       0,                              /* tp_traverse */
+       0,                              /* tp_clear */
+       0,                              /* tp_richcompare */
+       0,                              /* tp_weaklistoffset */
+       0,                              /* tp_iter */
+       0,                              /* tp_iternext */
+       0,                              /* tp_methods */
+       0,                              /* tp_members */
+       0,                              /* tp_getset */
+       0,                              /* tp_base */
+       0,                              /* tp_dict */
+       0,                              /* tp_descr_get */
+       0,                              /* tp_descr_set */
+       0,                              /* tp_dictoffset */
+       py_rx_transport_init,           /* tp_init */
+       0,                              /* tp_alloc */
+       0,                              /* tp_new */
+};
+
+/*
+ * Set up an RxRPC transport.
+ */
+PyObject *
+kafs_py_rx_new_transport(PyObject *_self, PyObject *args)
+{
+       struct py_rx_transport *obj;
+       struct kafs_transport_handle *trans;
+       union {
+               struct sockaddr sa;
+               struct sockaddr_in sin;
+               struct sockaddr_in6 sin6;
+       } sa;
+       const char *local_address = NULL;
+       socklen_t salen;
+       uint16_t local_port = 0;
+
+       if (!PyArg_ParseTuple(args, "|Hs", &local_port, &local_address))
+               return NULL;
+
+       memset(&sa, 0, sizeof(sa));
+       if (!local_address && !local_port) {
+               sa.sin.sin_family = AF_INET;
+               salen = sizeof(sa.sin);
+       } else if (!local_address) {
+               sa.sin.sin_family = AF_INET;
+               sa.sin.sin_port = htons(local_port);
+               salen = sizeof(sa.sin);
+       } else if (inet_pton(AF_INET, local_address, &sa.sin.sin_addr)) {
+               sa.sin.sin_family = AF_INET;
+               sa.sin.sin_port = htons(local_port);
+               salen = sizeof(sa.sin);
+       } else if (inet_pton(AF_INET6, local_address, &sa.sin.sin_addr)) {
+               sa.sin6.sin6_family = AF_INET6;
+               sa.sin6.sin6_port = htons(local_port);
+               salen = sizeof(sa.sin6);
+       } else {
+               return PyErr_Format(PyExc_RuntimeError,
+                                   "Unsupported network address '%s'", local_address);
+       }
+
+       obj = (struct py_rx_transport *)_PyObject_New(&py_rx_transportType);
+       if (!obj)
+               return PyExc_MemoryError;
+       py_rx_transport_init((PyObject *)obj, NULL, NULL);
+       assert(obj->x == NULL);
+
+       trans = rx_new_transport(&sa.sa, salen);
+       if (!trans) {
+               Py_DECREF(obj);
+               return errno == ENOMEM ? PyExc_MemoryError :
+                       PyErr_SetFromErrno(PyExc_IOError);
+       }
+       obj->x = trans;
+       return (PyObject *)obj;
+}
+
+/*
+ * Load a kAFS transport module.
+ */
+PyObject *
+kafs_py_load_kafs_transport(PyObject *_self, PyObject *args)
+{
+       const char *name = NULL;
+       int ret;
+
+       if (!PyArg_ParseTuple(args, "s", &name))
+               return NULL;
+
+       ret = load_kafs_transport_module(name);
+       if (ret < 0)
+               return errno == ENOMEM ? PyExc_MemoryError :
+                       PyErr_SetFromErrno(PyExc_IOError);
+       Py_RETURN_TRUE;
+}
index eeb123d5d16768d98b2c5b86b776c6ddd00c4eac..3096143c52cdb07ab0fd8cc68f2b9d1e726edf50 100644 (file)
 #ifndef _RXGEN_H
 #define _RXGEN_H
 
-#include "af_rxrpc.h"
+#include "transport.h"
 #include <stdbool.h>
 #include <errno.h>
 #include <stdlib.h>
+#include <arpa/inet.h>
 
 typedef uint32_t net_xdr_t;
 
 struct rx_connection {
-       struct sockaddr_rxrpc peer;
+       struct kafs_connection_handle *handle;
        uint32_t        last_abort_code;
-       int fd;
 };
 
 #define RXGEN_BUFFER_SIZE      1024
@@ -61,7 +61,10 @@ enum rx_call_state {
 
 struct rx_call {
        uint32_t        magic;
+       struct kafs_call_handle *handle;
        struct rx_connection *conn;
+       struct sockaddr sa;
+       socklen_t       sa_len;
        enum rx_call_state state;
        unsigned        known_to_kernel : 1;
        unsigned        secured : 1;
@@ -184,23 +187,29 @@ static inline int rxrpc_post_dec(struct rx_call *call)
 extern void rxrpc_dec_advance_buffer(struct rx_call *call);
 extern int rxgen_dec_discard_excess(struct rx_call *call);
 
-extern struct rx_connection *rx_new_connection(const struct sockaddr *sa,
+extern int load_kafs_transport_module(const char *name);
+
+extern struct kafs_transport_handle *rx_new_transport(const struct sockaddr *local_sa,
+                                                     socklen_t local_salen);
+extern void rx_free_transport(struct kafs_transport_handle *trans);
+
+extern struct rx_connection *rx_new_connection(struct kafs_transport_handle *trans,
+                                              const struct sockaddr *sa,
                                               socklen_t salen,
                                               uint16_t service,
-                                              uint16_t local_port,
-                                              uint16_t local_service,
                                               int exclusive,
-                                              const char *key,
-                                              int security);
+                                              const char *cellname,
+                                              enum kafs_connection_auth_level level);
 
-extern void rx_close_connection(struct rx_connection *z_conn);
+extern void rx_close_connection(struct rx_connection *conn);
 
-extern struct rx_call *rxrpc_alloc_call(struct rx_connection *z_conn, int incoming_call);
+extern struct rx_call *rxrpc_make_call(struct rx_connection *conn);
 extern void rxrpc_abort_call(struct rx_call *call, uint32_t abort_code);
 extern void rxrpc_terminate_call(struct rx_call *call, uint32_t abort_code);
 
 extern int rxrpc_send_data(struct rx_call *call);
-extern int rxrpc_recv_data(struct rx_connection *z_conn, bool nowait);
+extern int rxrpc_recv_data(struct rx_connection *conn, bool nowait);
+extern int rxrpc_maybe_recv_data(struct rx_connection *conn);
 extern int rxrpc_run_sync_call(struct rx_call *call);
 
 #endif /* _RXGEN_H */
similarity index 57%
rename from kafs/af_rxrpc.c
rename to kafs/transport.c
index 9ec8a4d05e10306bc2944000d3d07ab235308625..e6da4c3081063dadf30116dd2213e8c95390755e 100644 (file)
@@ -1,6 +1,6 @@
-/* AF_RXRPC driver
+/* Dynamically load the Rx transport module
  *
- * Copyright (C) 2014 Red Hat, Inc. All Rights Reserved.
+ * Copyright (C) 2015 Red Hat, Inc. All Rights Reserved.
  * Written by David Howells (dhowells@redhat.com)
  *
  * This program is free software; you can redistribute it and/or
  * 2 of the Licence, or (at your option) any later version.
  */
 
-#define _XOPEN_SOURCE
+#include <dlfcn.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
-#include <unistd.h>
-#include <poll.h>
 #include <errno.h>
 #include <limits.h>
-#include <sys/socket.h>
-#include "af_rxrpc.h"
+#include <netinet/in.h>
+#include "transport.h"
 #include "rxgen.h"
 
 #define RXGEN_CALL_MAGIC       0x52584745U
 #define RXGEN_BUF_MAGIC                (0x52420000U | __LINE__)
 #define RXGEN_BUF_DEAD         (0x6b6bU | __LINE__)
 
-#define debug(fmt, ...) do { if (0) printf(fmt, ## __VA_ARGS__); } while (0)
+#define debug(fmt, ...) do { if (0) printf("TRAN: "fmt, ## __VA_ARGS__); } while (0)
 
 uint32_t rxgen_dec_padding_sink;
 
+static struct kafs_transport *transport;
+static char transport_id[] = kafs_transport_id;
+
+static void rxrpc_data_ready(struct kafs_call_handle *call_handle, void *caller_data);
+
 /*
- * dump the control messages
+ * Load a transport module and extract the entry points
  */
-static __attribute__((unused))
-void dump_cmsg(struct msghdr *msg)
+int load_kafs_transport_module(const char *name)
 {
-       struct cmsghdr *cmsg;
-       unsigned long user_id;
-       unsigned char *p;
-       int abort_code;
-       int n;
-
-       for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
-               n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg));
-               p = CMSG_DATA(cmsg);
-
-               printf("CMSG: %zu: ", cmsg->cmsg_len);
-
-               if (cmsg->cmsg_level == SOL_RXRPC) {
-                       switch (cmsg->cmsg_type) {
-                       case RXRPC_USER_CALL_ID:
-                               printf("RXRPC_USER_CALL_ID: ");
-                               if (n != sizeof(user_id))
-                                       goto dump_data;
-                               memcpy(&user_id, p, sizeof(user_id));
-                               printf("%lx\n", user_id);
-                               continue;
+       void *trans;
+       char *err;
 
-                       case RXRPC_ABORT:
-                               printf("RXRPC_ABORT: ");
-                               if (n != sizeof(abort_code))
-                                       goto dump_data;
-                               memcpy(&abort_code, p, sizeof(abort_code));
-                               printf("%d\n", abort_code);
-                               continue;
+       trans = dlopen(name, RTLD_LAZY);
+       if (!trans) {
+               err = dlerror();
+               goto error;
+       }
 
-                       case RXRPC_ACK:
-                               printf("RXRPC_ACK");
-                               if (n != 0)
-                                       goto dump_data_colon;
-                               goto print_nl;
-
-                       case RXRPC_RESPONSE:
-                               printf("RXRPC_RESPONSE");
-                               if (n != 0)
-                                       goto dump_data_colon;
-                               goto print_nl;
-
-                       case RXRPC_NET_ERROR:
-                               printf("RXRPC_NET_ERROR: ");
-                               if (n != sizeof(abort_code))
-                                       goto dump_data;
-                               memcpy(&abort_code, p, sizeof(abort_code));
-                               printf("%s\n", strerror(abort_code));
-                               continue;
+       dlerror(); /* clear error */
+       transport = dlsym(trans, "kafs_transport");
+       err = dlerror();
+       if (err)
+               goto error_2;
+
+       if (memcmp(transport->id, transport_id, sizeof(transport_id)) != 0) {
+               fprintf(stderr, "Transport '%s' '%s' %zu\n",
+                       transport->id, transport_id, sizeof(transport_id));
+               err = "Transport has incorrect ID string\n";
+               goto error_2;
+       }
+       return 0;
 
-                       case RXRPC_BUSY:
-                               printf("RXRPC_BUSY");
-                               if (n != 0)
-                                       goto dump_data_colon;
-                               goto print_nl;
-
-                       case RXRPC_LOCAL_ERROR:
-                               printf("RXRPC_LOCAL_ERROR: ");
-                               if (n != sizeof(abort_code))
-                                       goto dump_data;
-                               memcpy(&abort_code, p, sizeof(abort_code));
-                               printf("%s\n", strerror(abort_code));
-                               continue;
+error_2:
+       dlclose(trans);
+error:
+       fprintf(stderr, "%s: %s\n", name, err);
+       errno = -ELIBBAD;
+       return -1;
+}
 
-                       default:
-                               break;
-                       }
-               }
+/*
+ * Set up a new transport
+ */
+struct kafs_transport_handle *rx_new_transport(const struct sockaddr *local_sa,
+                                              socklen_t local_salen)
+{
 
-               printf("l=%d t=%d", cmsg->cmsg_level, cmsg->cmsg_type);
+       if (!transport) {
+               fprintf(stderr, "No Rx transport loaded\n");
+               errno = EINVAL;
+               return NULL;
+       }
 
-       dump_data_colon:
-               printf(": ");
-       dump_data:
-               printf("{");
-               for (; n > 0; n--, p++)
-                       printf("%02x", *p);
+       return transport->init_transport(local_sa, local_salen);
+}
 
-       print_nl:
-               printf("}\n");
-       }
+/*
+ * Destroy a transport
+ */
+void rx_free_transport(struct kafs_transport_handle *trans)
+{
+       transport->free_transport(trans);
 }
 
 /*
  * Set up a new connection
  */
-struct rx_connection *rx_new_connection(const struct sockaddr *sa,
+struct rx_connection *rx_new_connection(struct kafs_transport_handle *trans_handle,
+                                       const struct sockaddr *sa,
                                        socklen_t salen,
                                        uint16_t service,
-                                       uint16_t local_port,
-                                       uint16_t local_service,
                                        int exclusive,
-                                       const char *key,
-                                       int security)
+                                       const char *cellname,
+                                       enum kafs_connection_auth_level level)
 {
-       struct sockaddr_rxrpc srx;
-       struct rx_connection *z_conn;
-       int ret;
-
-       z_conn = calloc(1, sizeof(*z_conn));
-       if (!z_conn)
-               return NULL;
-
-       z_conn->peer.srx_family = AF_RXRPC;
-       z_conn->peer.srx_service = service;
-       z_conn->peer.transport_type = SOCK_DGRAM;
-       z_conn->peer.transport_len = sizeof(srx.transport.sin);
-
+       struct rx_connection *conn;
+       const struct sockaddr_in *sin;
+       const struct sockaddr_in6 *sin6;
+       struct kafs_security_handle *security;
+       
+       /* Do some basic parameter checking */
        switch (sa->sa_family) {
        case 0:
                errno = EDESTADDRREQ;
-               goto error_conn;
-       case AF_INET:
-               if (salen != sizeof(struct sockaddr_in))
-                       goto inval;
-               break;
-       case AF_INET6:
-               if (salen != sizeof(struct sockaddr_in6))
-                       goto inval;
-               break;
-       default:
-               errno = EPROTOTYPE;
-               goto error_conn;
-       }
-
-       if (security < RXRPC_SECURITY_PLAIN ||
-           security > RXRPC_SECURITY_ENCRYPT)
-               goto inval;
-
-       memcpy(&z_conn->peer.transport, sa, salen);
-       switch (sa->sa_family) {
+               return NULL;
        case AF_INET:
-               if (!z_conn->peer.transport.sin.sin_port) {
+               if (salen != sizeof(struct sockaddr_in)) {
+                       errno = EINVAL;
+                       return NULL;
+               }
+               sin = (const struct sockaddr_in *)sa;
+               if (!sin->sin_port) {
                        errno = EDESTADDRREQ;
-                       goto error_conn;
+                       return NULL;
                }
                break;
        case AF_INET6:
-               if (!z_conn->peer.transport.sin6.sin6_port) {
+               if (salen != sizeof(struct sockaddr_in6)) {
+                       errno = EINVAL;
+                       return NULL;
+               }
+               sin6 = (const struct sockaddr_in6 *)sa;
+               if (!sin6->sin6_port) {
                        errno = EDESTADDRREQ;
-                       goto error_conn;
+                       return NULL;
                }
                break;
+       default:
+               errno = EPROTOTYPE;
+               return NULL;
        }
 
-       /* Open up a socket for talking to the AF_RXRPC module */
-       z_conn->fd = socket(AF_RXRPC, SOCK_DGRAM, PF_INET);
-       if (z_conn->fd < 0)
-                       goto error_conn;
-
-       if (exclusive) {
-               ret = setsockopt(z_conn->fd,
-                                SOL_RXRPC, RXRPC_EXCLUSIVE_CONNECTION,
-                                NULL, 0);
-               if (ret == -1)
-                       goto error_conn;
+       if (level < kafs_connection_no_auth ||
+           level > kafs_connection_encrypt) {
+               errno = EINVAL;
+               return NULL;
        }
 
-       if (key) {
-               ret = setsockopt(z_conn->fd, SOL_RXRPC, RXRPC_MIN_SECURITY_LEVEL,
-                                &security, sizeof(security));
-               if (ret == -1)
-                       goto error_conn;
-
-               ret = setsockopt(z_conn->fd, SOL_RXRPC, RXRPC_SECURITY_KEY,
-                                key, strlen(key));
-               if (ret == -1)
-                       goto error_conn;
-       }
+       /* Set up our connection tracking */
+       conn = calloc(1, sizeof(*conn));
+       if (!conn)
+               return NULL;
 
-       /* Bind an address to the local endpoint */
-       memset(&srx, 0, sizeof(srx));
-       srx.srx_family = AF_RXRPC;
-       srx.srx_service = local_service;
-       srx.transport_type = SOCK_DGRAM;
-       srx.transport_len = salen;
-       srx.transport.sin.sin_family = sa->sa_family;
-       switch (sa->sa_family) {
-       case AF_INET:
-               srx.transport.sin.sin_port = htons(local_port);
-               break;
-       case AF_INET6:
-               srx.transport.sin6.sin6_port = htons(local_port);
-               break;
-       }
+       /* Create a security handle */
+       security = transport->new_security(trans_handle, cellname, level);
+       if (!security)
+               goto error_conn;
 
-       ret = bind(z_conn->fd, (struct sockaddr *)&srx, sizeof(srx));
-       if (ret < 0)
-               goto error_fd;
+       /* Open the transport connection */
+       conn->handle = transport->open_connection(trans_handle,
+                                                 security,
+                                                 sa, salen,
+                                                 service,
+                                                 exclusive);
+       transport->free_security(security);
+       if (!conn->handle)
+               goto error_conn;
 
-       return z_conn;
+       return conn;
 
-inval:
-       errno = EINVAL;
-       goto error_conn;
-error_fd:
-       close(z_conn->fd);
 error_conn:
-       free(z_conn);
+       free(conn);
        return NULL;
 }
 
@@ -241,17 +177,17 @@ error_conn:
  * Close an RxRPC client connection.  This will cause all outstanding
  * operations to be aborted by the kernel..
  */
-void rx_close_connection(struct rx_connection *z_conn)
+void rx_close_connection(struct rx_connection *conn)
 {
-       close(z_conn->fd);
-       free(z_conn);
+       transport->close_connection(conn->handle);
+       free(conn);
 }
 
 /*
- * Allocate a call structure.  It is given one buffer ready to go.
+ * Create a call structure with which to make a call.  It is given one buffer
+ * ready to go.
  */
-struct rx_call *rxrpc_alloc_call(struct rx_connection *z_conn,
-                                int incoming_call)
+struct rx_call *rxrpc_make_call(struct rx_connection *conn)
 {
        struct rx_call *call;
        struct rx_buf *buf;
@@ -259,43 +195,107 @@ struct rx_call *rxrpc_alloc_call(struct rx_connection *z_conn,
 
        call = calloc(1, sizeof(struct rx_call));
        if (!call)
-               return NULL;
+               goto error_0;
 
        buf = calloc(1, sizeof(struct rx_buf));
-       if (!buf) {
-               free(call);
-               return NULL;
-       }
+       if (!buf)
+               goto error_1;
 
        data = malloc(RXGEN_BUFFER_SIZE);
-       if (!data) {
-               free(buf);
-               free(call);
-               return NULL;
-       }
+       if (!data)
+               goto error_2;
+
+       buf->magic = RXGEN_BUF_MAGIC;
+       buf->buf = data;
+
+       call->magic = RXGEN_CALL_MAGIC;
+       call->conn = conn;
+       call->state = rx_call_cl_not_started;
+       call->more_send = true;
+       call->more_recv = true;
+       call->buffer_head = buf;
+       call->buffer_tail = buf;
+       call->data_start = data;
+       call->data_cursor = data;
+       call->data_stop = data + RXGEN_BUFFER_SIZE;
+       call->buffer_space = RXGEN_BUFFER_SIZE;
+
+       call->handle = transport->make_call(conn->handle, rxrpc_data_ready, call);
+       if (!call->handle)
+               goto error_3;
 
        debug("Alloc: buf=%p data=%p\n", buf, data);
+       return call;
+
+error_3:
+       free(data);
+error_2:
+       free(buf);
+error_1:
+       free(call);
+error_0:
+       return NULL;
+}
+
+/*
+ * Create a call structure with which to accept a call.  It is given one buffer
+ * ready to go.
+ */
+#if 0
+struct rx_call *rxrpc_accept_call(struct kafs_service_handle *service_handle,
+                                 void *caller_data,
+                                 const struct sockaddr *peer,
+                                 socklen_t peerlen)
+{
+       struct rx_service *service = caller_data;
+       struct rx_call *call;
+       struct rx_buf *buf;
+       uint8_t *data;
+
+       call = calloc(1, sizeof(struct rx_call));
+       if (!call)
+               goto error_0;
+
+       buf = calloc(1, sizeof(struct rx_buf));
+       if (!buf)
+               goto error_1;
+
+       data = malloc(RXGEN_BUFFER_SIZE);
+       if (!data)
+               goto error_2;
 
        buf->magic = RXGEN_BUF_MAGIC;
        buf->buf = data;
 
-       if (incoming_call)
-               call->state = rx_call_sv_not_started;
-       else
-               call->state = rx_call_cl_not_started;
-
        call->magic = RXGEN_CALL_MAGIC;
-       call->conn = z_conn;
+       call->conn = conn;
+       call->state = rx_call_sv_not_started;
        call->more_send = true;
        call->more_recv = true;
        call->buffer_head = buf;
        call->buffer_tail = buf;
        call->data_start = data;
        call->data_cursor = data;
-       call->data_stop = incoming_call ? data : data + RXGEN_BUFFER_SIZE;
+       call->data_stop = data;
        call->buffer_space = RXGEN_BUFFER_SIZE;
+
+       call->handle = transport->accept_call(service_handle, peer, peerlen);
+       if (!call->handle)
+               goto error_3;
+
+       debug("Alloc: buf=%p data=%p\n", buf, data);
        return call;
+
+error_3:
+       free(data);
+error_2:
+       free(buf);
+error_1:
+       free(call);
+error_0:
+       return NULL;
 }
+#endif
 
 static void rxrpc_check_buf(const struct rx_buf *buf)
 {
@@ -329,9 +329,6 @@ static void rxrpc_check_call(const struct rx_call *call)
 int rxrpc_send_data(struct rx_call *call)
 {
        struct rx_buf *cursor;
-       struct msghdr msg;
-       size_t ctrllen, i;
-       unsigned char control[128];
        struct iovec iov[16];
        unsigned more;
        int ioc, ret;
@@ -354,17 +351,6 @@ int rxrpc_send_data(struct rx_call *call)
        }
 
 more_to_send:
-       /* Request an operation */
-       ctrllen = 0;
-       RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)call);
-
-       msg.msg_name            = &call->conn->peer;
-       msg.msg_namelen         = sizeof(call->conn->peer);
-       msg.msg_iov             = iov;
-       msg.msg_control         = control;
-       msg.msg_controllen      = ctrllen;
-       msg.msg_flags           = 0;
-
        /* We may have a completely sent buffer on the front of the queue if
         * this was the last buffer on the last send.  The buffer queue isn't
         * allowed to be empty at any point.
@@ -380,7 +366,7 @@ more_to_send:
                free(sent);
        }
 
-       more = MSG_MORE;
+       more = 1;
        for (ioc = 0; ioc < 16; ioc++) {
                rxrpc_check_buf(cursor);
 
@@ -404,17 +390,8 @@ more_to_send:
                }
                cursor = cursor->next;
        }
-       msg.msg_iovlen = ioc;
-
-       /* Send the data */
-       //dump_cmsg(&msg);
-
-       for (i = 0; i < msg.msg_iovlen; i++)
-               debug("IOV[%02zu] %04zu %p\n",
-                     i, msg.msg_iov[i].iov_len, msg.msg_iov[i].iov_base);
 
-       ret = sendmsg(call->conn->fd, &msg, more);
-       debug("SENDMSG: %d%s\n", ret, more ? " [more]" : "");
+       ret = transport->send_data(call->handle, iov, ioc, more);
        if (ret == -1)
                return -1;
 
@@ -470,63 +447,16 @@ more_to_send:
 }
 
 /*
- * Receive data from a socket.
+ * Call back from the transport to indicate that data is ready.
  */
-int rxrpc_recv_data(struct rx_connection *z_conn, bool nowait)
+static void rxrpc_data_ready(struct kafs_call_handle *call_handle, void *caller_data)
 {
-       struct rx_call *call;
+       struct rx_call *call = caller_data;
        struct rx_buf *bufs[4] = { NULL }, *cursor;
-       struct sockaddr_rxrpc srx;
-       struct cmsghdr *cmsg;
-       struct msghdr msg;
        struct iovec iov[4];
-       unsigned char control[128];
-       uint32_t tmpbuf[1];
-       int ioc, ret;
-
-       /* Peek at the next message */
-       iov[0].iov_base = &tmpbuf;
-       iov[0].iov_len = sizeof(tmpbuf);
-
-       memset(&msg, 0, sizeof(msg));
-       msg.msg_iov     = iov;
-       msg.msg_iovlen  = 1;
-       msg.msg_name    = &srx;
-       msg.msg_namelen = sizeof(srx);
-       msg.msg_control = control;
-       msg.msg_controllen = sizeof(control);
-       msg.msg_flags   = 0;
-
-       ret = recvmsg(z_conn->fd, &msg, MSG_PEEK | (nowait ? MSG_DONTWAIT : 0));
-       debug("RECVMSG PEEK: %d\n", ret);
-       if (ret == -1)
-               return -1;
-
-       /* Find the call ID. */
-       call = NULL;
-       for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
-               unsigned char *p;
-
-               if (cmsg->cmsg_level != SOL_RXRPC ||
-                   cmsg->cmsg_type != RXRPC_USER_CALL_ID)
-                       continue;
-
-               p = CMSG_DATA(cmsg);
-               call = *(struct rx_call **)p;
-               break;
-       }
-       if (!call)
-               abort();
-
-       /* Now actually retrieve that message */
-       memset(&msg, 0, sizeof(msg));
-       msg.msg_iov     = iov;
-       msg.msg_iovlen  = 0;
-       msg.msg_name    = &srx;
-       msg.msg_namelen = sizeof(srx);
-       msg.msg_control = control;
-       msg.msg_controllen = sizeof(control);
-       msg.msg_flags   = 0;
+       enum kafs_recv_condition cond;
+       uint32_t supplementary_info;
+       int i, ioc, ret;
 
        debug("Recv: buf[0]=%p data[0]=%p (io=%u)\n",
              call->buffer_tail, call->buffer_tail->buf, call->buffer_tail->io_cursor);
@@ -538,73 +468,69 @@ int rxrpc_recv_data(struct rx_connection *z_conn, bool nowait)
                bufs[0] = call->buffer_tail;
                iov[0].iov_base = bufs[0]->buf + bufs[0]->io_cursor;
                iov[0].iov_len = RXGEN_BUFFER_SIZE - bufs[0]->io_cursor;
-               msg.msg_iovlen = 1;
+               ioc = 1;
        } else {
-               msg.msg_iovlen = 0;
+               ioc = 0;
        }
 
-       for (ioc = msg.msg_iovlen; ioc < 4; ioc++) {
-               bufs[ioc] = calloc(1, sizeof(struct rx_buf));
-               bufs[ioc]->magic = RXGEN_BUF_MAGIC;
-               if (!bufs[ioc]) {
-                       while (--ioc >= (int)msg.msg_iovlen) {
-                               bufs[ioc]->magic = RXGEN_BUF_DEAD;
-                               free(bufs[ioc]->buf);
-                               free(bufs[ioc]);
+       for (i = ioc; i < 4; i++) {
+               bufs[i] = calloc(1, sizeof(struct rx_buf));
+               bufs[i]->magic = RXGEN_BUF_MAGIC;
+               if (!bufs[i]) {
+                       while (--i >= ioc) {
+                               bufs[i]->magic = RXGEN_BUF_DEAD;
+                               free(bufs[i]->buf);
+                               free(bufs[i]);
                        }
-                       return -1;
+                       return;
                }
-               bufs[ioc]->buf = malloc(RXGEN_BUFFER_SIZE);
-               if (!bufs[ioc]->buf) {
-                       bufs[ioc]->magic = RXGEN_BUF_DEAD;
-                       free(bufs[ioc]);
-                       while (--ioc >= (int)msg.msg_iovlen) {
-                               bufs[ioc]->magic = RXGEN_BUF_DEAD;
-                               free(bufs[ioc]->buf);
-                               free(bufs[ioc]);
+               bufs[i]->buf = malloc(RXGEN_BUFFER_SIZE);
+               if (!bufs[i]->buf) {
+                       bufs[i]->magic = RXGEN_BUF_DEAD;
+                       free(bufs[i]);
+                       while (--i >= ioc) {
+                               bufs[i]->magic = RXGEN_BUF_DEAD;
+                               free(bufs[i]->buf);
+                               free(bufs[i]);
                        }
-                       return -1;
+                       return;
                }
-               iov[ioc].iov_base = bufs[ioc]->buf;
-               iov[ioc].iov_len = RXGEN_BUFFER_SIZE;
+               iov[i].iov_base = bufs[i]->buf;
+               iov[i].iov_len = RXGEN_BUFFER_SIZE;
        }
-       msg.msg_iovlen = 4;
+       ioc = 4;
 
-       ret = recvmsg(z_conn->fd, &msg, 0);
-       debug("RECVMSG: %d\n", ret);
+       ret = transport->recv_data(call->handle, iov, ioc, &cond, &supplementary_info);
+       debug("RECVDATA: %d\n", ret);
        if (ret == -1)
-               return -1;
-
-       debug("RECV: %d [fl:%x]\n", ret, msg.msg_flags);
-       debug("CMSG: %zu\n", msg.msg_controllen);
-       debug("IOV: %zu [0]=%zu\n", msg.msg_iovlen, iov[0].iov_len);
+               return;
 
        call->bytes_received += ret;
 
        /* Attach any used buffers to the call and discard the rest */
        if (ret > 0) {
-               for (ioc = 0; ioc < 4 && ret > 0; ioc++) {
-                       unsigned added = RXGEN_BUFFER_SIZE - bufs[ioc]->io_cursor;
-                       debug("xfer[%d] space=%u rem=%d\n", ioc, added, ret);
+               for (i = 0; i < 4 && ret > 0; i++) {
+                       unsigned added = RXGEN_BUFFER_SIZE - bufs[i]->io_cursor;
+                       debug("xfer[%d] space=%u rem=%d\n", i, added, ret);
                        if (added > (unsigned)ret)
                                added = (unsigned)ret;
-                       bufs[ioc]->io_cursor += added;
+                       bufs[i]->io_cursor += added;
                        call->data_count += added;
                        ret -= added;
-                       if (call->buffer_tail == bufs[ioc])
+                       if (call->buffer_tail == bufs[i])
                                continue;
-                       call->buffer_tail->next = bufs[ioc];
-                       call->buffer_tail = bufs[ioc];
+                       call->buffer_tail->next = bufs[i];
+                       call->buffer_tail = bufs[i];
                        if (ret <= 0) {
-                               ioc++;
+                               i++;
                                break;
                        }
                }
 
-               for (; ioc < 4; ioc++) {
-                       bufs[ioc]->magic = RXGEN_BUF_DEAD;
-                       free(bufs[ioc]->buf);
-                       free(bufs[ioc]);
+               for (; i < 4; i++) {
+                       bufs[i]->magic = RXGEN_BUF_DEAD;
+                       free(bufs[i]->buf);
+                       free(bufs[i]);
                }
        }
 
@@ -614,75 +540,52 @@ int rxrpc_recv_data(struct rx_connection *z_conn, bool nowait)
                debug("RecvQ buf=%p data=%p iocur=%u\n",
                      cursor, cursor->buf, cursor->io_cursor);
 
-       /* Process the metadata */
-       if (msg.msg_flags & MSG_EOR)
+       /* Process the call state given to us by the receiver. */
+       switch (cond) {
+       case kafs_recv_more:
+               break;
+       case kafs_recv_final_ack:
+               if (call->state == rx_call_sv_waiting_for_final_ack) {
+                       call->state = rx_call_sv_complete;
+               } else {
+                       fprintf(stderr, "RxRPC: Recv-Ack in bad call state (%d)\n",
+                               call->state);
+                       abort();
+               }
+       case kafs_recv_complete:
                call->known_to_kernel = 0;
-       if (!(msg.msg_flags & MSG_MORE))
+       case kafs_recv_complete_now_send:
                call->more_recv = false;
+               break;
 
-       for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
-               unsigned char *p;
-               int n;
-
-               if (cmsg->cmsg_level != SOL_RXRPC)
-                       continue;
-
-               n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg));
-               p = CMSG_DATA(cmsg);
-
-               switch (cmsg->cmsg_type) {
-               case RXRPC_ABORT:
-                       if (n != sizeof(call->abort_code))
-                               call->abort_code = 0;
-                       else
-                               memcpy(&call->abort_code, p, sizeof(call->abort_code));
-                       z_conn->last_abort_code = call->abort_code;
-                       call->error_code = ECONNABORTED;
-                       call->state = rx_call_remotely_aborted;
-                       break;
-
-               case RXRPC_NET_ERROR:
-                       if (n != sizeof(ret)) {
-                               errno = EBADMSG;
-                               return -1;
-                       }
-                       memcpy(&ret, p, sizeof(ret));
-                       call->error_code = ret;
-                       call->state = rx_call_net_error;
-                       break;
-
-               case RXRPC_LOCAL_ERROR:
-                       if (n != sizeof(ret)) {
-                               errno = EBADMSG;
-                               return -1;
-                       }
-                       memcpy(&ret, p, sizeof(ret));
-                       call->error_code = ret;
-                       call->state = rx_call_local_error;
-                       break;
-
-               case RXRPC_BUSY:
-                       call->error_code = ECONNREFUSED;
-                       call->state = rx_call_rejected_busy;
-                       break;
-
-               case RXRPC_ACK:
-                       if (call->state == rx_call_sv_waiting_for_final_ack) {
-                               call->state = rx_call_sv_complete;
-                       } else {
-                               fprintf(stderr, "RxRPC: Recv-Ack in bad call state (%d)\n",
-                                       call->state);
-                               abort();
-                       }
-                       break;
-
-               case RXRPC_RESPONSE:
-                       call->secured = 1;
-                       break;
-
-               default:
-                       break;
-               }
+       case kafs_recv_abort:
+               call->conn->last_abort_code = supplementary_info;
+               call->abort_code = supplementary_info;
+               call->error_code = ECONNABORTED;
+               call->state = rx_call_remotely_aborted;
+               call->known_to_kernel = 0;
+               call->more_recv = false;
+               break;
+       case kafs_recv_net_error:
+               call->error_code = supplementary_info;
+               call->state = rx_call_net_error;
+               call->known_to_kernel = 0;
+               call->more_recv = false;
+               break;
+       case kafs_recv_local_error:
+               call->error_code = supplementary_info;
+               call->state = rx_call_local_error;
+               call->known_to_kernel = 0;
+               call->more_recv = false;
+               break;
+       case kafs_recv_busy:
+               call->error_code = ECONNREFUSED;
+               call->state = rx_call_rejected_busy;
+               call->known_to_kernel = 0;
+               call->more_recv = false;
+               break;
+       default:
+               abort();
        }
 
        /* Switch into appropriate decode state */
@@ -699,7 +602,7 @@ loop:
                         * and is willing to settle for nothing also.
                         */
                        if (call->data_count <= 0 && call->more_recv)
-                               return 0;
+                               return;
                } else if (call->data_count < call->need_size) {
                        if (!call->more_recv) {
                                /* Short data */
@@ -707,7 +610,7 @@ loop:
                                call->error_code = ENODATA;
                                rxrpc_abort_call(call, 1);
                        }
-                       return 0;
+                       return;
                }
 
                /* Handle reception of the opcode in a server-side call.  Note
@@ -721,6 +624,7 @@ loop:
                        goto loop;
                }
 
+               debug("DEC %u > %u\n", call->data_count, call->need_size);
                ret = call->decoder(call);
                if (ret < 0) {
                        rxrpc_abort_call(call, 1);
@@ -746,7 +650,7 @@ loop:
 
                                /* Invoke the processor */
                                call->processor(call);
-                               return 0;
+                               return;
                        }
                }
                break;
@@ -757,14 +661,12 @@ loop:
 
        case rx_call_sv_processing:
                call->failed(call);
-               return 0;
+               return;
 
        default:
                fprintf(stderr, "RxRPC: Recv in bad call state (%d)\n", call->state);
                abort();
        }
-
-       return 0;
 }
 
 /*
@@ -772,25 +674,9 @@ loop:
  */
 void rxrpc_abort_call(struct rx_call *call, uint32_t abort_code)
 {
-       struct msghdr msg;
-       size_t ctrllen;
-       unsigned char control[128];
 
        if (call->known_to_kernel) {
-               memset(control, 0, sizeof(control));
-               ctrllen = 0;
-               RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)call);
-               RXRPC_ADD_ABORT(control, ctrllen, abort_code);
-
-               msg.msg_name            = &call->conn->peer;
-               msg.msg_namelen         = sizeof(call->conn->peer);
-               msg.msg_iov             = NULL;
-               msg.msg_iovlen          = 0;
-               msg.msg_control         = control;
-               msg.msg_controllen      = ctrllen;
-               msg.msg_flags           = 0;
-
-               sendmsg(call->conn->fd, &msg, 0);
+               transport->abort_call(call->handle, abort_code);
                call->known_to_kernel = 0;
        }
        call->state = rx_call_locally_aborted;
@@ -806,6 +692,7 @@ void rxrpc_terminate_call(struct rx_call *call, uint32_t abort_code)
        rxrpc_check_call(call);
        if (call->known_to_kernel)
                rxrpc_abort_call(call, abort_code);
+       transport->terminate_call(call->handle);
        call->magic = 0x7a7b7c7d;
        for (cursor = call->buffer_head; cursor; cursor = next) {
                rxrpc_check_buf(cursor);
@@ -819,26 +706,26 @@ void rxrpc_terminate_call(struct rx_call *call, uint32_t abort_code)
        free(call);
 }
 
+/*
+ * Poll the transport and receive data if there's anything available.
+ */
+int rxrpc_maybe_recv_data(struct rx_connection *conn)
+{
+       return transport->poll_connection(conn->handle, true);
+}
+
 /*
  * Run a single call synchronously.
  */
 int rxrpc_run_sync_call(struct rx_call *call)
 {
-       struct pollfd fds[1];
+       int ret;
 
        while (call->known_to_kernel) {
-               fds[0].fd = call->conn->fd;
-               fds[0].events = POLLIN;
-               fds[0].revents = 0;
-
-               if (poll(fds, 1, -1) == -1) {
-                       fprintf(stderr, "Poll failed: %m\n");
-                       return -1;
-               }
-               if (rxrpc_recv_data(call->conn, false)) {
-                       fprintf(stderr, "rxrpc_recv_data failed: %m\n");
-                       return -1;
-               }
+               debug("Run call %d\n", call->known_to_kernel);
+               ret = transport->poll_connection(call->conn->handle, false);
+               if (ret < 0)
+                       return ret;
        }
 
        switch (call->state) {
@@ -1002,15 +889,25 @@ void rxrpc_dec_advance_buffer(struct rx_call *call)
         */
        cursor = call->buffer_head;
        segment = cursor->io_cursor;
+       debug("DEC Advance %u\n", segment);
        new_stop = cursor->buf + segment;
        if (call->data_stop >= new_stop) {
                /* This buffer must then be completely used as we're required to check
                 * amount received before reading it.
                 */
-               if (new_stop != cursor->buf + RXGEN_BUFFER_SIZE)
-                       abort(); /* Didn't completely consume a buffer */
-               if (call->buffer_tail == cursor)
-                       abort(); /* Unexpectedly out of data */
+               if (call->data_stop > new_stop) {
+                       debug("buffer overrun (%p > %p)\n", call->data_stop, new_stop);
+                       abort();
+               }
+               if (new_stop != cursor->buf + cursor->io_cursor) {
+                       debug("buffer not consumed (%p != %p)\n",
+                             new_stop, cursor->buf + cursor->io_cursor);
+                       abort();
+               }
+               if (call->buffer_tail == cursor) {
+                       debug("unexpectedly out of data\n");
+                       abort();
+               }
 
                /* Move to the next buffer */
                rxrpc_post_dec(call);
diff --git a/kafs/transport.h b/kafs/transport.h
new file mode 100644 (file)
index 0000000..87abdf8
--- /dev/null
@@ -0,0 +1,124 @@
+/* Access points into the dynamically loaded Rx transport module
+ *
+ * Copyright (C) 2015 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#ifndef _KAFS_TRANSPORT_H
+#define _KAFS_TRANSPORT_H
+
+#include <stdint.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+
+struct kafs_transport_handle;
+struct kafs_security_handle;
+struct kafs_connection_handle;
+struct kafs_call_handle;
+
+enum kafs_connection_auth_level {
+       kafs_connection_no_auth,
+       kafs_connection_local_auth,
+       kafs_connection_clear,
+       kafs_connection_integrity_only,
+       kafs_connection_encrypt,
+};
+
+enum kafs_recv_condition {
+       kafs_recv_more,
+       kafs_recv_final_ack,
+       kafs_recv_complete,
+       kafs_recv_complete_now_send,
+       kafs_recv_abort,
+       kafs_recv_net_error,
+       kafs_recv_local_error,
+       kafs_recv_busy,
+};
+
+#define kafs_transport_id "kAFS-utils tran"
+
+struct kafs_transport_handle;
+struct kafs_security_handle;
+struct kafs_connection_handle;
+struct kafs_service_handle;
+struct kafs_call_handle;
+
+typedef void (*kafs_incoming_call_func)(struct kafs_service_handle *service,
+                                       void *caller_data,
+                                       const struct sockaddr *peer,
+                                       socklen_t *peerlen);
+
+typedef void (*kafs_data_ready_func)(struct kafs_call_handle *call,
+                                    void *caller_data);
+
+struct kafs_transport {
+       char id[16];    /* = kafs_transport_id */
+       char name[16];  /* Name of transport */
+
+       struct kafs_transport_handle *(*init_transport)(const struct sockaddr *sa,
+                                                       socklen_t salen);
+
+       void (*free_transport)(struct kafs_transport_handle *transport);
+
+       struct kafs_security_handle *(*new_security)(struct kafs_transport_handle *transport,
+                                                    const char *cell_name,
+                                                    enum kafs_connection_auth_level level);
+
+       void (*free_security)(struct kafs_security_handle *security);
+
+       struct kafs_connection_handle *(*open_connection)(struct kafs_transport_handle *transport,
+                                                         struct kafs_security_handle *security,
+                                                         const struct sockaddr *sa,
+                                                         socklen_t salen,
+                                                         uint16_t service,
+                                                         int exclusive_connection);
+
+       void (*close_connection)(struct kafs_connection_handle *connection);
+
+       struct kafs_call_handle *(*make_call)(struct kafs_connection_handle *connection,
+                                             kafs_data_ready_func data_ready_func,
+                                             void *caller_data);
+
+       struct kafs_service_handle *(*new_service)(struct kafs_transport_handle *transport,
+                                                  kafs_incoming_call_func new_call_func,
+                                                  void *caller_data,
+                                                  uint16_t local_port,
+                                                  uint16_t local_service,
+                                                  struct kafs_security_handle **permits);
+
+       void (*shutdown_service)(struct kafs_service_handle *service);
+       
+       struct kafs_call_handle *(*accept_call)(struct kafs_service_handle *service,
+                                               kafs_data_ready_func data_ready_func,
+                                               void *caller_data,
+                                               struct sockaddr *_sa,
+                                               socklen_t *_salen);
+
+       void (*abort_call)(struct kafs_call_handle *call,
+                          uint32_t abort_code);
+
+       void (*terminate_call)(struct kafs_call_handle *call);
+
+       int (*send_data)(struct kafs_call_handle *call,
+                        struct iovec *iov,
+                        int ioc,
+                        int more);
+
+       int (*recv_data)(struct kafs_call_handle *call,
+                        struct iovec *iov,
+                        int ioc,
+                        enum kafs_recv_condition *_cond,
+                        uint32_t *_supplementary_info);
+
+       int (*poll_connection)(struct kafs_connection_handle *conn,
+                             int nowait);
+};
+
+extern struct kafs_transport kafs_transport;
+
+#endif /* _KAFS_TRANSPORT_H */
index b386bc9710ba8248e159aaa753444553071ad89e..c362f6e916961a81544c328fb940a9401a0dd81f 100644 (file)
@@ -481,7 +481,7 @@ def emit_func_send(o, func, what):
     # Allocate call
     if what == "request":
         o.rxsrc("\n")
-        o.rxsrc("\tcall = rxrpc_alloc_call(z_conn, 0);\n")
+        o.rxsrc("\tcall = rxrpc_make_call(z_conn);\n")
         o.rxsrc("\tif (!call)\n")
         o.rxsrc("\t\treturn ", bad_ret, ";\n")
         o.rxsrc("\tcall->decoder = rxgen_decode_", func.name, "_response;\n")
index aa669b1acdc1dbe40e7d3755a9136dca5615c2ce..af63a6e9d6d766c7978ad0a2637726b88ac354e4 100644 (file)
@@ -72,6 +72,8 @@ def emit_py_module(o):
     o.pysrc(" */\n")
     o.pysrc("static PyMethodDef module_methods[] = {\n")
 
+    o.pysrc("\t{\"load_kafs_transport\", (PyCFunction)kafs_py_load_kafs_transport, METH_VARARGS, \"\" },\n")
+    o.pysrc("\t{\"rx_new_transport\", (PyCFunction)kafs_py_rx_new_transport, METH_VARARGS, \"\" },\n")
     o.pysrc("\t{\"rx_new_connection\", (PyCFunction)kafs_py_rx_new_connection, METH_VARARGS, \"\" },\n")
     o.pysrc("\t{\"afs_string_to_key\", (PyCFunction)kafs_py_string_to_key, METH_VARARGS, \"\" },\n")
 
@@ -104,6 +106,7 @@ def emit_py_module(o):
     # Load types
     if o.xdr.py_type_defs:
         o.pysrc("\tif (")
+        o.pysrc("PyType_Ready(&py_rx_transportType) < 0 ||\n\t    ")
         o.pysrc("PyType_Ready(&py_rx_connectionType) < 0 ||\n\t    ")
         o.pysrc("PyType_Ready(&py_rx_split_infoType) < 0")
         for pyt in o.xdr.py_type_defs:
index 61a7244e130dcb18d48e51bc96dd77dfcf295a07..902897a803fa40583d9d8bb07142e50dc5fd4dc9 100644 (file)
@@ -342,7 +342,7 @@ def emit_py_func_simple_sync_call(o, func):
         o.pysrc("\t\treturn NULL;\n")
 
     o.pysrc("\n")
-    o.pysrc("\tcall = rxrpc_alloc_call(z_conn->x, 0);\n")
+    o.pysrc("\tcall = rxrpc_make_call(z_conn->x);\n")
     o.pysrc("\tif (!call) {\n")
     if func.split:
         o.pysrc("\t\tPy_XDECREF(split_info);\n");
index edc5a34818d991292181e0b104ed2c2d67114881..db0e0619383e73d0dd142a5d777694612565df2d 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -47,9 +47,10 @@ setup(name = "kafs",
                                            "kafs/rxgen_afs_py.c",
                                            "kafs/py_passwd.c",
                                            "kafs/py_rxgen.c",
+                                           "kafs/py_rxtrans.c",
                                            "kafs/py_rxconn.c",
                                            "kafs/py_rxsplit.c",
-                                           "kafs/af_rxrpc.c"
+                                           "kafs/transport.c"
                                        ],
                                extra_compile_args = [
                                    "-O0",