]> www.infradead.org Git - users/dhowells/kafs-utils.git/commitdiff
Implement basic I/O on top of an AF_RXRPC socket
authorDavid Howells <dhowells@redhat.com>
Mon, 30 Sep 2019 09:13:00 +0000 (10:13 +0100)
committerDavid Howells <dhowells@redhat.com>
Fri, 5 May 2023 09:06:45 +0000 (10:06 +0100)
Implement basic I/O on top of an AF_RXRPC socket so that the codecs
generated by rxgen.py will work.

Signed-off-by: David Howells <dhowells@redhat.com>
.gitignore
Makefile [new file with mode: 0644]
lib/Makefile [new file with mode: 0644]
lib/af_rxrpc.C [new file with mode: 0644]
lib/rxrpc.H [new file with mode: 0644]
lib/rxrpc_core.C [new file with mode: 0644]
lib/rxrpc_xdr.C [new file with mode: 0644]
lib/transport.H [new file with mode: 0644]

index b25c15b81fae06e1c55946ac6270bfdb293870e8..bda489349fdcc941a4aeaa2d05d296a82101d5a9 100644 (file)
@@ -1 +1,5 @@
 *~
+*.o
+.*.o.d
+*.so
+lib/libkafs_utils.*
diff --git a/Makefile b/Makefile
new file mode 100644 (file)
index 0000000..38fda8f
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,7 @@
+all:
+       $(MAKE) -C lib
+
+clean:
+       $(RM) *~
+       $(MAKE) -C lib clean
+       $(MAKE) -C rxgen clean
diff --git a/lib/Makefile b/lib/Makefile
new file mode 100644 (file)
index 0000000..6e81c71
--- /dev/null
@@ -0,0 +1,20 @@
+CFLAGS         := -g -Wall -Wformat -fpic -O2 -Wno-pointer-arith
+
+LIB_SRCS       := rxrpc_xdr.C af_rxrpc.C rxrpc_core.C
+LIB_OBJS       := $(patsubst %.C,%.o,$(LIB_SRCS))
+
+%.o: %.C
+       $(CXX) $(CPPFLAGS) -MMD -MF .$@.d $(CFLAGS) -o $@ -c $<
+
+all: libkafs_utils.a
+
+libkafs_utils.a: $(LIB_OBJS)
+       $(AR) rcs -o $@ $(LIB_OBJS)
+
+DEPS           := $(wildcard .*.o.d)
+ifneq ($(DEPS),)
+include $(DEPS)
+endif
+
+clean:
+       $(RM) *~ *.o *.so *.a $(DEPS)
diff --git a/lib/af_rxrpc.C b/lib/af_rxrpc.C
new file mode 100644 (file)
index 0000000..7079c68
--- /dev/null
@@ -0,0 +1,985 @@
+/* AF_RXRPC driver
+ *
+ * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#define _XOPEN_SOURCE
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <poll.h>
+#include <errno.h>
+#include <limits.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <linux/rxrpc.h>
+#include "transport.H"
+
+using rxrpc::ref;
+class af_rxrpc_call;
+
+#define debug(fmt, ...) \
+       do { if (rxrpc::debug_transport) printf("AFRX: " fmt, ## __VA_ARGS__); } while (0)
+#define debug_buffer(fmt, ...) \
+       do { if (rxrpc::debug_buffers) printf("BUF: " fmt, ## __VA_ARGS__); } while (0)
+
+#define RXRPC_SELECT_CALL_FOR_RECV     7       /* Specify the call for recvmsg, SIOCINQ, etc. */
+#define RXRPC_SELECT_CALL_FOR_SEND     8       /* Specify the call for splice, SIOCOUTQ, etc. */
+
+#define RXRPC_SET_SECURITY_KEY 14
+#define RXRPC_SET_SECURITY_LEVEL 15
+
+/*
+ * Add a call ID to a control message sequence.
+ */
+#define RXRPC_ADD_CALLID(control, ctrllen, id)                         \
+do {                                                                   \
+       unsigned char *__buffer = (control);                            \
+       unsigned long *__data;                                          \
+       struct cmsghdr *__cmsg;                                         \
+       __cmsg = (struct cmsghdr *)(__buffer + (ctrllen));              \
+       __cmsg->cmsg_len        = CMSG_LEN(sizeof(*__data));            \
+       __cmsg->cmsg_level      = SOL_RXRPC;                            \
+       __cmsg->cmsg_type       = RXRPC_USER_CALL_ID;                   \
+       __data = (unsigned long *)CMSG_DATA(__cmsg);                    \
+       *__data = (id);                                                 \
+       (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len);                      \
+} while (0)
+
+/*
+ * Add an abort instruction to a control message sequence.
+ */
+#define RXRPC_ADD_ABORT(control, ctrllen, abort_code)                  \
+do {                                                                   \
+       void *__buffer = (control);                                     \
+       unsigned int *__data;                                           \
+       struct cmsghdr *__cmsg;                                         \
+       __cmsg = (struct cmsghdr *)(__buffer + (ctrllen));              \
+       __cmsg->cmsg_len        = CMSG_LEN(sizeof(*__data));            \
+       __cmsg->cmsg_level      = SOL_RXRPC;                            \
+       __cmsg->cmsg_type       = RXRPC_ABORT;                          \
+       __data = (unsigned int *)CMSG_DATA(__cmsg);                     \
+       *__data = (abort_code);                                         \
+       (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len);                      \
+} while (0)
+
+/*
+ * Add a security level instruction to a control message sequence.
+ */
+#define RXRPC_ADD_SECURITY_LEVEL(control, ctrllen, sec_level)          \
+do {                                                                   \
+       void *__buffer = (control);                                     \
+       unsigned int *__data;                                           \
+       struct cmsghdr *__cmsg;                                         \
+       __cmsg = (struct cmsghdr *)(__buffer + (ctrllen));              \
+       __cmsg->cmsg_len        = CMSG_LEN(sizeof(*__data));            \
+       __cmsg->cmsg_level      = SOL_RXRPC;                            \
+       __cmsg->cmsg_type       = RXRPC_SET_SECURITY_LEVEL;             \
+       __data = (void *)CMSG_DATA(__cmsg);                             \
+       *__data = (sec_level);                                          \
+       (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len);                      \
+} while (0)
+
+/*
+ * Add a total transmit length instruction to a control message sequence.
+ */
+#define RXRPC_ADD_TX_LENGTH(control, ctrllen, length)                  \
+do {                                                                   \
+       void *__buffer = (control);                                     \
+       unsigned long long __data = (length);                           \
+       struct cmsghdr *__cmsg;                                         \
+       __cmsg = (struct cmsghdr *)(__buffer + (ctrllen));              \
+       __cmsg->cmsg_len        = CMSG_LEN(sizeof(__data));             \
+       __cmsg->cmsg_level      = SOL_RXRPC;                            \
+       __cmsg->cmsg_type       = RXRPC_TX_LENGTH;                      \
+       memcpy(CMSG_DATA(__cmsg), &__data, sizeof(__data));             \
+       (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len);                      \
+} while (0)
+
+/*
+ * Add a miscellaneous option to a control message sequence.
+ */
+#define RXRPC_ADD_MISC_OPTION(control, ctrllen, opt)                   \
+do {                                                                   \
+       void *__buffer = (control);                                     \
+       struct cmsghdr *__cmsg;                                         \
+       __cmsg = (struct cmsghdr *)(__buffer + (ctrllen));              \
+       __cmsg->cmsg_len        = CMSG_LEN(0);                          \
+       __cmsg->cmsg_level      = SOL_RXRPC;                            \
+       __cmsg->cmsg_type       = opt;                                  \
+       (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len);                      \
+} while (0)
+
+/*
+ * RxRPC tranport definition.
+ */
+class af_rxrpc : public rxrpc::Transport {
+public:
+       std::string name() const;
+
+       ref<rxrpc::Endpoint> new_local_endpoint(const struct sockaddr_rxrpc *local_addr,
+                                               rxrpc::Security *security);
+
+       ref<rxrpc::Security> new_security(const std::string &key_name,
+                                         enum rxrpc::security_auth_level level);
+
+       ~af_rxrpc();
+};
+
+/*
+ * RxRPC local endpoint representation.
+ */
+class af_rxrpc_endpoint : public rxrpc::Endpoint {
+public:
+       struct sockaddr_rxrpc   local;
+       int                     fd;
+
+       /* Message reception infrastructure */
+       std::list<rxrpc::Rx_buffer> rx_spare_buffers;   /* Spare bufferage for receive */
+       unsigned int            rx_nr_buffers;          /* Number of buffers on the list */
+#define CONN_MSGS 3
+#define CONN_CMSG_SIZE 128
+#define CONN_NR_IOV 4
+       struct mmsghdr          rx_msgs[CONN_MSGS];     /* Message headers */
+       struct sockaddr_rxrpc   rx_peer[CONN_MSGS];     /* Mass peer address bufferage */
+       struct iovec            rx_iov[CONN_MSGS * CONN_NR_IOV]; /* Mass data bufferage */
+       unsigned char           rx_cmsg[CONN_MSGS * CONN_CMSG_SIZE]; /* Mass control message bufferage */
+
+       af_rxrpc_endpoint(const struct sockaddr_rxrpc *local_addr,
+                         rxrpc::Security *security);
+       ~af_rxrpc_endpoint()
+               {
+                       int i;
+                       if (fd != -1)
+                               close(fd);
+
+                       for (i = 0; i < CONN_MSGS * CONN_NR_IOV; i++)
+                               delete [] (unsigned char *)rx_iov[i].iov_base;
+               }
+
+       ref<rxrpc::Call> new_client_call(struct rxrpc::Call_params *params,
+                                        const char *op_name,
+                                        size_t size);
+       void receive(bool nowait);
+
+       ref<af_rxrpc_call> get_call_from_anciliary(struct msghdr *msg);
+       void recv_msg(struct msghdr *msg, unsigned int len);
+};
+
+/*
+ * AF_RXRPC security descriptor.
+ */
+class af_rxrpc_security : public rxrpc::Security {
+public:
+       unsigned int    min_sec_level;  /* RXRPC_SECURITY_* value */
+       af_rxrpc_security(const std::string &key_name,
+                         enum rxrpc::security_auth_level level)
+               : Security(key_name, level)
+               {
+                       min_sec_level = RXRPC_SECURITY_PLAIN;
+               }
+};
+
+/*
+ * AF_RXRPC call representation.
+ */
+class af_rxrpc_call : public rxrpc::Call {
+public:
+       bool            known_to_kernel;
+
+       af_rxrpc_call(rxrpc::Call_params *params, const char *op_name, size_t size);
+       ~af_rxrpc_call();
+
+       void end(int32_t abort_code);
+       void abort(int32_t abort_code);
+       void send_data(rxrpc::Enc_buffer *buf, bool more);
+       void end_split(void);
+
+       void process_anciliary(struct msghdr *msg);
+       void recv_msg(struct msghdr *msg, unsigned int len);
+};
+
+/*
+ * dump the control messages
+ */
+static __attribute__((unused))
+void dump_cmsg(const char *dir, struct msghdr *msg)
+{
+       struct timespec tv;
+       struct cmsghdr *cmsg;
+       unsigned long long tx_len;
+       unsigned long user_id;
+       unsigned char *p;
+       int abort_code;
+       int n;
+
+       for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
+               n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg));
+               p = CMSG_DATA(cmsg);
+
+               debug("%s CMSG: %zu: ", dir, cmsg->cmsg_len);
+
+               switch (cmsg->cmsg_level) {
+               case SOL_SOCKET:
+                       switch (cmsg->cmsg_type) {
+                       case SO_TIMESTAMPNS_NEW:
+                               printf("SO_TIMESTAMPNS_NEW: ");
+                               if (n != sizeof(tv))
+                                       goto dump_data;
+                               memcpy(&tv, p, sizeof(tv));
+                               printf("%lu.%lu\n", tv.tv_sec, tv.tv_nsec);
+                               continue;
+                       default:
+                               printf("SOCKET_%d", cmsg->cmsg_type);
+                               break;
+                       }
+                       break;
+
+               case SOL_RXRPC:
+                       switch (cmsg->cmsg_type) {
+                       case RXRPC_USER_CALL_ID:
+                               printf("RXRPC_USER_CALL_ID: ");
+                               if (n != sizeof(user_id))
+                                       goto dump_data;
+                               memcpy(&user_id, p, sizeof(user_id));
+                               printf("%lx\n", user_id);
+                               continue;
+
+                       case RXRPC_ABORT:
+                               printf("RXRPC_ABORT: ");
+                               if (n != sizeof(abort_code))
+                                       goto dump_data;
+                               memcpy(&abort_code, p, sizeof(abort_code));
+                               printf("%d\n", abort_code);
+                               continue;
+
+                       case RXRPC_ACK:
+                               printf("RXRPC_ACK");
+                               if (n != 0)
+                                       goto dump_data_colon;
+                               goto print_nl;
+
+                       case RXRPC_NET_ERROR:
+                               printf("RXRPC_NET_ERROR: ");
+                               if (n != sizeof(abort_code))
+                                       goto dump_data;
+                               memcpy(&abort_code, p, sizeof(abort_code));
+                               printf("%s\n", strerror(abort_code));
+                               continue;
+
+                       case RXRPC_BUSY:
+                               printf("RXRPC_BUSY");
+                               if (n != 0)
+                                       goto dump_data_colon;
+                               goto print_nl;
+
+                       case RXRPC_LOCAL_ERROR:
+                               printf("RXRPC_LOCAL_ERROR: ");
+                               if (n != sizeof(abort_code))
+                                       goto dump_data;
+                               memcpy(&abort_code, p, sizeof(abort_code));
+                               printf("%s\n", strerror(abort_code));
+                               continue;
+
+                       case RXRPC_UPGRADE_SERVICE:
+                               printf("RXRPC_UPGRADE_SERVICE\n");
+                               continue;
+                       case RXRPC_TX_LENGTH:
+                               printf("RXRPC_TX_LENGTH: ");
+                               if (n != sizeof(tx_len))
+                                       goto dump_data;
+                               memcpy(&tx_len, p, sizeof(tx_len));
+                               printf("%llu\n", tx_len);
+                               continue;
+                       case RXRPC_SET_CALL_TIMEOUT:
+                               printf("RXRPC_SET_CALL_TIMEOUT\n");
+                               continue;
+                       default:
+                               printf("RXRPC_%d", cmsg->cmsg_type);
+                               break;
+                       }
+                       break;
+
+               default:
+                       printf("l=%d t=%d", cmsg->cmsg_level, cmsg->cmsg_type);
+                       break;
+               }
+
+
+       dump_data_colon:
+               printf(": ");
+       dump_data:
+               printf("{");
+               for (; n > 0; n--, p++)
+                       printf("%02x", *p);
+
+       print_nl:
+               printf("}\n");
+       }
+}
+
+std::string af_rxrpc::name() const
+{
+       return "AF_RXRPC";
+}
+
+/*
+ * Construct a local endpoint.
+ */
+af_rxrpc_endpoint::af_rxrpc_endpoint(const struct sockaddr_rxrpc *local_addr,
+                                    rxrpc::Security *security)
+       : Endpoint(local_addr)
+{
+       int opt, i;
+
+       fd = -1;
+       rx_nr_buffers = 0;
+       memset(rx_msgs, 0, sizeof(rx_msgs));
+       memset(rx_peer, 0, sizeof(rx_peer));
+       memset(rx_iov, 0, sizeof(rx_iov));
+       memset(rx_cmsg, 0, sizeof(rx_cmsg));
+
+       /* Set up the message descriptors */
+       for (i = 0; i < CONN_MSGS * CONN_NR_IOV; i++)
+               rx_iov[i].iov_len = RXGEN_BUFFER_SIZE;
+
+       for (i = 0; i < CONN_MSGS; i++) {
+               struct mmsghdr *m = &rx_msgs[i];
+
+               m->msg_hdr.msg_iov      = rx_iov + i * CONN_NR_IOV;
+               m->msg_hdr.msg_iovlen   = CONN_NR_IOV;
+               m->msg_hdr.msg_name     = &rx_peer[i];
+               m->msg_hdr.msg_namelen  = sizeof(rx_peer[i]);
+               m->msg_hdr.msg_control  = rx_cmsg;
+               m->msg_hdr.msg_controllen = CONN_CMSG_SIZE;
+               m->msg_hdr.msg_flags    = 0;
+       }
+
+       /* Open up a socket for talking to the AF_RXRPC module */
+       fd = socket(AF_RXRPC, SOCK_DGRAM, PF_INET6);
+       if (fd < 0)
+               throw rxrpc::syserror("socket()");
+
+       try {
+               opt = 1;
+               if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMPNS_NEW, (char *)&opt, sizeof(opt)) == -1)
+                       throw rxrpc::syserror("setsockopt(SO_TIMESTAMPNS_NEW)");
+
+               if (address.srx_family &&
+                   bind(fd, (struct sockaddr *)&address, sizeof(address)) == -1)
+                       throw rxrpc::syserror("bind");
+
+               if (security) {
+                       af_rxrpc_security *sec = dynamic_cast<af_rxrpc_security *>(security);
+                       if (sec->level != rxrpc::security_no_auth) {
+                               debug("OPT RXRPC_SECURITY_KEY %s\n", security->key_name.c_str());
+                               if (setsockopt(fd, SOL_RXRPC, RXRPC_SECURITY_KEY,
+                                              security->key_name.c_str(),
+                                              security->key_name.size()) < 0)
+                                       throw rxrpc::syserror("setsockopt(RXRPC_SECURITY_KEY)");
+
+                               debug("OPT RXRPC_MIN_SECURITY_LEVEL %u\n", sec->min_sec_level);
+                               if (setsockopt(fd, SOL_RXRPC, RXRPC_MIN_SECURITY_LEVEL,
+                                              &sec->min_sec_level,
+                                              sizeof(sec->min_sec_level)) < 0)
+                                       throw rxrpc::syserror("setsockopt(RXRPC_MIN_SECURITY_LEVEL)");
+                       }
+               }
+       } catch (...) {
+               close(fd);
+               throw;
+       }
+}
+
+/*
+ * Initialise the transport module
+ */
+ref<rxrpc::Endpoint> af_rxrpc::new_local_endpoint(const struct sockaddr_rxrpc *local_addr,
+                                                 rxrpc::Security *security)
+{
+       ref<af_rxrpc_endpoint> endpoint;
+
+       if (local_addr &&
+           local_addr->transport_len) {
+               if (local_addr->transport_len > sizeof(local_addr->transport))
+                       throw std::invalid_argument("Transport address too long");
+
+               switch (endpoint->address.transport.family) {
+               case AF_INET:
+               case AF_INET6:
+                       break;
+               default:
+                       throw std::invalid_argument("Unsupported transport type\n");
+               }
+       }
+
+       return new af_rxrpc_endpoint(local_addr, security);
+}
+
+/*
+ * Set up a security descriptor.  Note that the key name can be NULL (for
+ * instance if -noauth was passed).
+ */
+ref<rxrpc::Security> af_rxrpc::new_security(const std::string &key_name,
+                                           enum rxrpc::security_auth_level level)
+{
+       ref<af_rxrpc_security> security;
+
+       security = new af_rxrpc_security(key_name, level);
+
+       switch (level) {
+       case rxrpc::security_no_auth:
+               break;
+       case rxrpc::security_local_auth:
+               throw std::invalid_argument("Local auth not supported by AF_RXRPC transport");
+       case rxrpc::security_unset:
+       case rxrpc::security_clear:
+               security->min_sec_level = RXRPC_SECURITY_PLAIN;
+               if (security->key_name.size() == 0)
+                       throw std::invalid_argument("Unspecified key name");
+               break;
+       case rxrpc::security_integrity_only:
+               security->min_sec_level = RXRPC_SECURITY_AUTH;
+               if (security->key_name.size() == 0)
+                       throw std::invalid_argument("Unspecified key name");
+               break;
+       case rxrpc::security_encrypt:
+               security->min_sec_level = RXRPC_SECURITY_ENCRYPT;
+               if (security->key_name.size() == 0)
+                       throw std::invalid_argument("Unspecified key name");
+               break;
+       }
+
+       return security;
+}
+
+/*
+ * Construct a client call.
+ */
+af_rxrpc_call::af_rxrpc_call(rxrpc::Call_params *params, const char *op_name, size_t size)
+       : Call(params, op_name, size)
+{
+       known_to_kernel = false;
+}
+
+/*
+ * Allocate a call handle for a new call.
+ */
+ref <rxrpc::Call> af_rxrpc_endpoint::new_client_call(rxrpc::Call_params *params,
+                                                    const char *op_name,
+                                                    size_t size)
+{
+       return new af_rxrpc_call(params, op_name, size);
+}
+
+/*
+ * Dump data from an output buffer.
+ */
+static void dump_enc_buffers(rxrpc::Enc_buffer *&buf)
+{
+       unsigned int pos = 0, ioc, i;
+       bool nl = false;
+
+       for (ioc = 0; ioc < buf->ioc; ioc++) {
+               for (i = 0; i < buf->msg_iov[ioc].iov_len; i++) {
+                       if (!(pos & 3)) {
+                               if (!(pos & 31))
+                                       printf("%06x: ", pos);
+                               else
+                                       printf(" ");
+                       }
+
+                       printf("%02x", ((unsigned char *)buf->msg_iov[ioc].iov_base)[i]);
+                       pos++;
+                       nl = false;
+                       if ((pos & 31) == 0) {
+                               printf("\n");
+                               nl = true;
+                       }
+               }
+       }
+
+       if (!nl)
+               printf("\n");
+}
+
+/*
+ * Send buffered data.
+ */
+void af_rxrpc_call::send_data(rxrpc::Enc_buffer *buf, bool more)
+{
+       af_rxrpc_endpoint *endpoint = this->endpoint.cast<af_rxrpc_endpoint>();
+       struct msghdr msg;
+       unsigned char control[128];
+       unsigned int flags = 0, tmp, i;
+       socklen_t olen;
+       ssize_t slen;
+       size_t ctrllen;
+       char abuf[512];
+
+       if (rxrpc::debug_transport)
+               printf("\n");
+
+       /* Switch into encode state */
+       switch (state) {
+       case call_cl_not_started:
+       case call_sv_processing:
+               state = (enum call_state)(state + 1);
+               break;
+       case call_cl_encoding_params:
+       case call_sv_encoding_response:
+               break;
+       default:
+               fprintf(stderr, "RxRPC: Send in bad call state (%d)\n", state);
+               ::abort();
+       }
+
+       /* Request an operation */
+       ctrllen = 0;
+       RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)(rxrpc::refcount *)this);
+       if (exclusive)
+               RXRPC_ADD_MISC_OPTION(control, ctrllen, RXRPC_EXCLUSIVE_CALL);
+       if (upgrade_service)
+               RXRPC_ADD_MISC_OPTION(control, ctrllen, RXRPC_UPGRADE_SERVICE);
+       if (total_tx_size) {
+               debug("total Tx size %llu\n", total_tx_size);
+               RXRPC_ADD_TX_LENGTH(control, ctrllen, total_tx_size);
+       }
+
+       msg.msg_name            = &peer;
+       msg.msg_namelen         = peer_len;
+       msg.msg_iov             = buf ? buf->msg_iov : NULL;
+       msg.msg_iovlen          = buf ? buf->ioc : 0;
+       msg.msg_control         = control;
+       msg.msg_controllen      = ctrllen;
+       msg.msg_flags           = 0;
+
+       if (more)
+               flags |= MSG_MORE;
+
+       switch (peer.transport.family) {
+       case AF_INET:
+               inet_ntop(peer.transport.family, &peer.transport.sin.sin_addr,
+                         abuf, sizeof(abuf));
+               debug("Tx %u Addr %s:%u\n",
+                     peer.srx_service, abuf, ntohs(peer.transport.sin.sin_port));
+               break;
+       case AF_INET6:
+               inet_ntop(peer.transport.family, &peer.transport.sin6.sin6_addr,
+                         abuf, sizeof(abuf));
+               debug("Tx %u Addr [%s]:%u\n",
+                     peer.srx_service, abuf, ntohs(peer.transport.sin6.sin6_port));
+               break;
+       }
+
+       if (rxrpc::debug_transport) {
+               dump_cmsg("Tx", &msg);
+               for (i = 0; i < msg.msg_iovlen; i++)
+                       debug("Tx IOV[%02u] %04zu %p\n",
+                             i, msg.msg_iov[i].iov_len, msg.msg_iov[i].iov_base);
+       }
+       if (rxrpc::debug_data)
+               dump_enc_buffers(buf);
+
+       olen = sizeof(tmp);
+       getsockopt(endpoint->fd, SOL_SOCKET, SO_ERROR, (char *)&tmp, &olen);
+
+       /* Send the data */
+       slen = sendmsg(endpoint->fd, &msg, flags);
+       if (slen < 0) {
+               debug("SENDMSG: %m\n");
+               throw rxrpc::syserror("sendmsg");
+       }
+
+       debug("SENDMSG: %zd%s\n", slen, more ? " [more]" : "");
+       known_to_kernel = true;
+
+       if (!more) {
+               state = (enum call_state)(state + 1);
+               if (state == call_cl_encoding_params ||
+                   state == call_sv_encoding_response)
+                       state = (enum call_state)(state + 1);
+       }
+}
+
+/*
+ * End a split phase.
+ */
+void af_rxrpc_call::end_split(void)
+{
+       switch (state) {
+       case call_cl_encoding_params: {
+               send_data(NULL, false);
+               break;
+       }
+
+       case call_cl_receiving_response:
+               dec_func = rxrpc::receive_reply;
+               dec_func(this);
+               break;
+
+       default:
+               fprintf(stderr, "rxrpc::end_split() called in state %u\n",
+                       state);
+               return;
+       }
+}
+
+/*
+ * Determine the call a received message applies to by examining the anciliary
+ * data from recvmsg().
+ */
+ref<af_rxrpc_call> af_rxrpc_endpoint::get_call_from_anciliary(struct msghdr *msg)
+{
+       ref<af_rxrpc_call> call;
+       struct cmsghdr *cmsg;
+       unsigned long tmp;
+
+       if (rxrpc::debug_transport)
+               dump_cmsg("Rx", msg);
+
+       for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
+               unsigned char *p = CMSG_DATA(cmsg);
+               int n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg));
+
+               if (cmsg->cmsg_level != SOL_RXRPC)
+                       continue;
+
+               switch (cmsg->cmsg_type) {
+               case RXRPC_USER_CALL_ID:
+                       if (n != sizeof(tmp)) {
+                               fprintf(stderr, "User call ID wrong size\n");
+                               abort();
+                       }
+
+                       memcpy(&tmp, p, sizeof(tmp));
+                       call.assign_cast((rxrpc::refcount *)tmp);
+                       return call;
+               }
+       }
+
+       throw std::runtime_error("recvmsg() didn't include a RXRPC_USER_CALL_ID cmsg");
+}
+
+/*
+ * Process the metadata given to us by recvmsg().
+ */
+void af_rxrpc_call::process_anciliary(struct msghdr *msg)
+{
+       struct sockaddr_rxrpc *srx = (struct sockaddr_rxrpc *)msg->msg_name;
+       struct cmsghdr *cmsg;
+
+       if (service_id != srx->srx_service)
+               debug("Upgraded service to %u\n", srx->srx_service);
+       service_id = srx->srx_service; /* Changes if service upgrade */
+
+       if ((msg->msg_flags & MSG_EOR) && state != call_completed) {
+               debug("Rx EOR\n");
+               known_to_kernel = false;
+               state = call_completed;
+               completed = call_success;
+       }
+
+       for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
+               unsigned char *p = CMSG_DATA(cmsg);
+               int n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg));
+
+               if (cmsg->cmsg_level == SOL_SOCKET) {
+                       switch (cmsg->cmsg_type) {
+                       case SO_TIMESTAMPNS_NEW:
+                               if (have_rx_timestamp)
+                                       continue;
+
+                               if (n != sizeof(rx_timestamp)) {
+                                       fprintf(stderr, "SO_TIMESTAMPNS wrong size\n");
+                                       std::abort();
+                               }
+
+                               memcpy(&rx_timestamp, p, n);
+                               have_rx_timestamp = true;
+                               debug("Rx tstamp\n");
+                               break;
+                       }
+               }
+
+               if (cmsg->cmsg_level != SOL_RXRPC)
+                       continue;
+
+               switch (cmsg->cmsg_type) {
+               case RXRPC_USER_CALL_ID:
+                       break;
+
+               case RXRPC_ABORT:
+                       if (n == 4)
+                               memcpy(&abort_code, p, 4);
+                       error = ECONNABORTED;
+                       state = call_completed;
+                       completed = call_remotely_aborted;
+                       break;
+
+               case RXRPC_NET_ERROR:
+                       if (n != sizeof(error))
+                               throw std::runtime_error("Can't parse RXRPC_NET_ERROR cmsg");
+                       memcpy(&error, p, 4);
+                       state = call_completed;
+                       completed = call_network_error;
+                       break;
+
+               case RXRPC_LOCAL_ERROR:
+                       if (n != sizeof(error))
+                               throw std::runtime_error("Can't parse RXRPC_LOCAL_ERROR cmsg");
+                       memcpy(&error, p, 4);
+                       state = call_completed;
+                       completed = call_local_error;
+                       break;
+
+               case RXRPC_BUSY:
+                       error = ECONNREFUSED;
+                       state = call_completed;
+                       completed = call_rejected_busy;
+                       break;
+
+               case RXRPC_ACK:
+                       if (state != call_sv_waiting_for_final_ack) {
+                               fprintf(stderr, "RxRPC: Recv-Ack in bad call state (%d)\n",
+                                       state);
+                               std::abort();
+                       }
+                       state = call_completed;
+                       completed = call_success;
+                       break;
+
+               default:
+                       fprintf(stderr, "Unexpected CMSG type %x\n", cmsg->cmsg_type);
+                       break;
+               }
+       }
+
+       if (state == call_completed) {
+               if (completion_func) {
+                       completion_func(this);
+                       completion_func = NULL;
+               } else {
+                       switch (completed) {
+                       case call_remotely_aborted:
+                               throw_abort_func(abort_code, op_name);
+                       case call_network_error:
+                               throw rxrpc::network_error(error);
+                       case call_local_error:
+                       case call_rejected_busy:
+                               throw rxrpc::syserror(error);
+                       default:
+                               break;
+                       }
+               }
+       }
+
+       debug("<--af_rxrpc_call::process_anciliary()\n");
+}
+
+/*
+ * Process a message from a call.
+ */
+void af_rxrpc_call::recv_msg(struct msghdr *msg, unsigned int len)
+{
+       af_rxrpc_endpoint *endpoint = this->endpoint.cast<af_rxrpc_endpoint>();
+       struct iovec *iov = msg->msg_iov;
+       unsigned int tmp;
+       int i;
+
+       process_anciliary(msg);
+
+       if (len > CONN_NR_IOV * RXGEN_BUFFER_SIZE) {
+               fprintf(stderr, "Too much data returned\n");
+               ::abort();
+       }
+
+       if (rxrpc::debug_data) {
+               tmp = len;
+               for (i = 0; i < CONN_NR_IOV && tmp > 0; i++) {
+                       unsigned int seg = (tmp <= iov[i].iov_len) ? tmp : iov[i].iov_len;
+                       bool nl = false;
+                       printf("Rx IOV[%u] %zu %u %p\n", i, iov[i].iov_len, seg, iov[i].iov_base);
+
+                       unsigned int j, s = seg; // <= 32 ? seg : 32;
+                       for (j = 0; j < s; j++) {
+                               if (!(j & 3)) {
+                                       if (!(j & 31))
+                                               printf("%06x: ", j);
+                                       else
+                                               printf(" ");
+                               }
+                               printf("%02x", ((unsigned char *)iov[i].iov_base)[j]);
+                               nl = false;
+                               if ((j & 31) == 31) {
+                                       printf("\n");
+                                       nl = true;
+                               }
+                       }
+                       if (!nl)
+                               printf("\n");
+                       tmp -= seg;
+               }
+       }
+
+       /* Update the buffers with the amount of data stored. */
+       for (i = 0; i < CONN_NR_IOV && len > 0; i++) {
+               rxrpc::Rx_buffer &b = endpoint->rx_spare_buffers.front();
+               unsigned int n;
+
+               b.index = dec_index++;
+
+               n = RXGEN_BUFFER_SIZE;
+               if (n > len)
+                       n = len;
+               debug("BUF[%u] +%-4u %zu {%u}\n",
+                     b.index, n, dec_amount + n, endpoint->rx_nr_buffers);
+               b.len = n;
+               b.buf = (unsigned char *)iov[i].iov_base;
+               iov[i].iov_base = NULL;
+
+               endpoint->rx_nr_buffers--;
+               std::list<rxrpc::Rx_buffer>::iterator to = endpoint->rx_spare_buffers.begin();
+               to++;
+               dec_buffers.splice(dec_buffers.end(),
+                                  endpoint->rx_spare_buffers,
+                                  endpoint->rx_spare_buffers.begin(),
+                                  to);
+               dec_amount += n;
+               len -= n;
+       }
+
+       dec_func(this);
+}
+
+/*
+ * Process a message.
+ */
+void af_rxrpc_endpoint::recv_msg(struct msghdr *msg, unsigned int len)
+{
+       af_rxrpc_call *call = get_call_from_anciliary(msg);
+
+       call->recv_msg(msg, len);
+}
+
+/*
+ * Receive data from a variety of calls in one go.
+ */
+void af_rxrpc_endpoint::receive(bool nowait)
+{
+       struct iovec *iov = rx_iov;
+       int count, i, j;
+
+       if (rxrpc::debug_transport)
+               printf("\n");
+
+       /* Tamp down the buffers to make sure we cycle through the memory */
+       j = 0;
+       for (i = 0; i < CONN_MSGS * CONN_NR_IOV; i++) {
+               if (!iov[i].iov_base)
+                       continue;
+               iov[j++].iov_base = iov[i].iov_base;
+       }
+
+       for (; j < CONN_MSGS * CONN_NR_IOV; j++)
+               iov[j].iov_base = new unsigned char[RXGEN_BUFFER_SIZE];
+
+       /* Make sure we have sufficient buffer descriptors */
+       while (rx_nr_buffers < CONN_MSGS * CONN_NR_IOV) {
+               rx_spare_buffers.emplace_back();
+               rx_nr_buffers++;
+       }
+
+       /* Receive messages.  The descriptor array is already set up and gets
+        * reset as messages are processed.
+        */
+       count = recvmmsg(fd, rx_msgs, CONN_MSGS,
+                        nowait ? MSG_DONTWAIT : MSG_WAITFORONE,
+                        NULL);
+       debug("RECVMMSG: %d\n", count);
+       if (count == -1) {
+               if (errno != ENODATA)
+                       throw rxrpc::syserror("recvmmsg");
+               return;
+       }
+
+       for (i = 0; i < count; i++) {
+               struct mmsghdr *m = &rx_msgs[i];
+               struct msghdr *msg = &m->msg_hdr;
+
+               debug("Rx DATA: %d [fl:%x]\n", m->msg_len, msg->msg_flags);
+
+               recv_msg(msg, m->msg_len);
+
+               /* Reset the message descriptor. */
+               m->msg_hdr.msg_namelen  = sizeof(rx_peer[i]);
+               m->msg_hdr.msg_controllen = CONN_CMSG_SIZE;
+               m->msg_hdr.msg_flags    = 0;
+       }
+}
+
+/*
+ * Abort a call.
+ */
+void af_rxrpc_call::abort(int32_t abort_code)
+{
+       af_rxrpc_endpoint *endpoint = this->endpoint.cast<af_rxrpc_endpoint>();
+       struct msghdr msg;
+       size_t ctrllen;
+       unsigned char control[128];
+
+       if (known_to_kernel) {
+               memset(control, 0, sizeof(control));
+               ctrllen = 0;
+               RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)(rxrpc::refcount *)this);
+               RXRPC_ADD_ABORT(control, ctrllen, abort_code);
+
+               msg.msg_name            = &peer;
+               msg.msg_namelen         = peer_len;
+               msg.msg_iov             = NULL;
+               msg.msg_iovlen          = 0;
+               msg.msg_control         = control;
+               msg.msg_controllen      = ctrllen;
+               msg.msg_flags           = 0;
+
+               sendmsg(endpoint->fd, &msg, 0);
+               state = call_completed;
+               completed = call_locally_aborted;
+       }
+}
+
+/*
+ * End a call, aborting it if necessary.
+ */
+void af_rxrpc_call::end(int32_t abort_code)
+{
+       if (state != call_completed)
+               abort(abort_code);
+}
+
+/*
+ * Destroy a call.
+ */
+af_rxrpc_call::~af_rxrpc_call()
+{
+       if (known_to_kernel) {
+               fprintf(stderr, "Call still known to kernel\n");
+               end(RX_USER_ABORT);
+       }
+}
+
+af_rxrpc::~af_rxrpc()
+{
+}
+
+ref<rxrpc::Transport> create_af_rxrpc_transport()
+{
+       return new af_rxrpc();
+}
diff --git a/lib/rxrpc.H b/lib/rxrpc.H
new file mode 100644 (file)
index 0000000..4fc2830
--- /dev/null
@@ -0,0 +1,594 @@
+/* Core definitions.
+ *
+ * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#ifndef RXRPC_HH
+#define RXRPC_HH
+
+#include <climits>
+#include <exception>
+#include <stdexcept>
+#include <string>
+#include <vector>
+#include <list>
+#include <cstring>
+#include <cerrno>
+#include <system_error>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include <arpa/inet.h>
+#include <uuid/uuid.h>
+#include <linux/rxrpc.h>
+#include "pointer.H"
+
+namespace rxrpc {
+
+class Call;
+
+extern bool debug_buffers;
+extern bool debug_core;
+extern bool debug_data;
+extern bool debug_ops;
+extern bool debug_transport;
+extern bool debug_xdr;
+
+typedef unsigned int net_xdr_t;
+
+struct xdr_u64 { net_xdr_t hi, lo; };
+
+enum security_auth_level {
+       security_unset,
+       security_no_auth,
+       security_local_auth,
+       security_clear,
+       security_integrity_only,
+       security_encrypt,
+};
+
+enum error_source {
+       error_none,
+       error_invalid_argument,
+       error_remote_abort,
+       error_peer_busy,
+       error_from_system,
+       error_from_network,
+       error_from_parameters,
+};
+
+/*
+ * Local transport endpoint.
+ */
+class Endpoint : public refcount {
+public:
+       struct sockaddr_rxrpc   address;
+
+       Endpoint(const struct sockaddr_rxrpc *local_addr)
+               {
+                       if (local_addr)
+                               memcpy(&address, local_addr, sizeof(address));
+                       else
+                               memset(&address, 0, sizeof(address));
+               }
+
+       virtual ~Endpoint() = 0;
+       virtual void receive(bool nowait=false) = 0;
+       virtual ref<Call> new_client_call(struct Call_params *params,
+                                         const char *op_name,
+                                         size_t size) = 0;
+};
+
+/*
+ * Security details to use.
+ */
+class Security : public refcount {
+public:
+       std::string                     key_name; /* Name of key to use */
+       enum security_auth_level        level;
+       Security(const std::string &_key_name,
+                enum rxrpc::security_auth_level _level)
+               : key_name(_key_name), level(_level)
+               {}
+       virtual ~Security();
+};
+
+/*
+ * Call parameters.
+ */
+struct Call_params {
+       ref<Endpoint>           endpoint;
+       ref<Security>           security;
+       struct sockaddr_rxrpc   peer;
+       socklen_t               peer_len;
+       bool                    exclusive;
+       bool                    upgrade_service;
+       unsigned short          service_id_used;
+
+       Call_params()
+               {
+                       memset(&peer, 0, sizeof(peer));
+                       peer_len = 0;
+                       exclusive = false;
+                       upgrade_service = false;
+                       service_id_used = 0;
+               }
+};
+
+class Resv {
+public:
+       unsigned int            max_ioc;        /* Size of msg_iov[] */
+       bool                    flat;           /* T to do it all in one iovec */
+       size_t                  alloc;          /* Size of buffer allocated */
+       size_t                  additional;     /* Size of additional content */
+};
+
+/*
+ * RxRPC encoded data buffer.
+ */
+class Enc_buffer : public refcount {
+public:
+       net_xdr_t               *enc_buf;       /* Encoding buffer */
+       net_xdr_t               *enc_buf_p;     /* Running pointer into encoding buffer */
+       net_xdr_t               *enc_buf_end;   /* Encoding buffer limit */
+       struct iovec            *msg_iov;       /* For sendmsg()'s msghdr */
+       unsigned int            ioc;            /* Occupancy of msg_iov[] */
+       bool                    flat;           /* T to do it all in one iovec */
+       size_t                  size;           /* Size of content, including additional iovecs */
+       Enc_buffer()
+               {
+                       enc_buf = NULL;
+                       enc_buf_p = NULL;
+                       enc_buf_end = NULL;
+                       msg_iov = NULL;
+                       ioc = 0;
+                       flat = false;
+                       size = 0;
+               }
+       ~Enc_buffer();
+};
+
+/*
+ * RxRPC received data buffer.
+ */
+class Rx_buffer {
+public:
+#define RXGEN_BUFFER_SIZE      1024
+       unsigned char           *buf;
+       unsigned short          len;            /* Amount of data in buffer   */
+       unsigned short          used;           /* How much data used */
+       unsigned int            index;          /* Index in buffer list */
+       bool                    no_delete;      /* Don't delete the buffer */
+
+       Rx_buffer()
+               {
+                       buf = NULL;
+                       len = 0;
+                       used = 0;
+                       index = 0;
+                       no_delete = false;
+               }
+       ~Rx_buffer()
+               {
+                       if (!no_delete)
+                               delete [] buf;
+                       buf = NULL;
+               }
+};
+
+/*
+ * Received data queue.
+ */
+class Rx_queue {
+public:
+       const char              *op_name;       /* Name of the RPC operation */
+       bool                    client_side;    /* True if client side of call */
+       unsigned int            dec_index;
+       std::list<Rx_buffer>    dec_buffers;    /* List of received buffers */
+       size_t                  dec_amount;     /* Amount of data in dec_buffers */
+
+       Rx_queue(const char *opname = NULL);
+       ~Rx_queue();
+       static void read_copy(void *buffer, size_t offset, const void *data, size_t len);
+       void read(void *buffer, size_t buffer_size,
+                 void (*copy)(void *buffer, size_t offset, const void *data, size_t len)
+                 = read_copy);
+       virtual void overrun();
+};
+
+/*
+ * RxRPC call.
+ */
+class Call : public refcount, public Rx_queue {
+public:
+       enum call_state {
+               call_cl_not_started,
+               call_cl_encoding_params,
+               call_cl_receiving_response,
+
+               call_sv_not_started,
+               call_sv_receiving_params,
+               call_sv_wait_for_no_MSG_MORE,
+               call_sv_processing,
+               call_sv_encoding_response_split,
+               call_sv_encoding_response,
+               call_sv_response_encoded,
+               call_sv_waiting_for_final_ack,
+               call_completed,
+       };
+
+       enum call_completion {
+               call_incomplete,
+               call_success,
+               call_remotely_aborted,
+               call_locally_aborted,
+               call_network_error,
+               call_local_error,
+               call_rejected_busy,
+       };
+
+       ref<Endpoint>           endpoint;
+       struct sockaddr_rxrpc   peer;
+       socklen_t               peer_len;
+       unsigned short          service_id;
+       enum call_state         state;
+       enum call_completion    completed;
+       uint32_t                abort_code;
+       unsigned int            error;
+
+       bool                    split;          /* True if split-mode call */
+       bool                    exclusive;
+       bool                    upgrade_service;
+       bool                    have_rx_timestamp;
+
+       struct timespec         rx_timestamp;   /* Timestamp of first received packet */
+
+       void (*completion_func)(Call *);
+       void (*throw_abort_func)(int ac, const char *op);
+
+       /* Transmission phase state */
+       unsigned long long      total_tx_size;  /* Total expected transmission size */
+       uint32_t                encode_error;
+
+       /* Receive phase state */
+       void (*dec_func)(Call *);
+
+       Call();
+       Call(struct Call_params *params, const char *op_name, size_t size);
+       virtual ~Call();
+
+       virtual void send_data(Enc_buffer *buf, bool more=false) = 0;
+       virtual void end(int32_t abort_code=0) = 0;
+       virtual void abort(int32_t abort_code) = 0;
+       virtual void end_split(void) = 0;
+
+       void dec_underrun(size_t);
+       void request_received();
+};
+
+extern void find_transport(void);
+extern ref<Endpoint> new_local_endpoint(const struct sockaddr_rxrpc *local_sa = NULL,
+                                       rxrpc::Security *security = NULL);
+extern ref<Security> new_security(const std::string &key_name,
+                                 enum security_auth_level level);
+
+extern ref<Enc_buffer> alloc_enc_buffer(Resv &resv);
+extern ref<Call> make_call(Call_params *z_params,
+                          const char *op_name,
+                          Enc_buffer *z_request);
+extern void receive_reply(Call *call);
+
+/*
+ * UUID.
+ */
+struct Uuid {
+       uuid_t  uuid;
+       Uuid() { memset(uuid, 0, sizeof(uuid)); }
+};
+
+/*
+ * Opaque data.
+ */
+struct Opaque {
+       size_t  buffer_size;
+       void    *buffer;
+       bool    del_buffer;
+
+       inline Opaque(void)
+               {
+                       buffer_size = 0;
+                       buffer = NULL;
+                       del_buffer = false;
+               }
+       inline ~Opaque(void)
+               {
+                       if (del_buffer)
+                               free(buffer);
+               }
+       inline size_t size(void) const { return buffer_size; }
+       inline void reserve(size_t size)
+               {
+                       if (buffer && del_buffer)
+                               free(buffer);
+                       buffer = malloc(size + 1);
+                       ((unsigned char *)buffer)[size] = 0;
+                       del_buffer = true;
+               }
+};
+
+/*
+ * XDR encoding size calculation routines
+ */
+static inline void reserve(Resv &resv)
+{
+       resv.alloc += 4;
+}
+
+static inline void reserve_a32(Resv &resv, unsigned int n)
+{
+       resv.alloc += 4 * n;
+}
+
+static inline void reserve_u64(Resv &resv)
+{
+       resv.alloc += 8;
+}
+
+static inline void reserve_a64(Resv &resv, unsigned int n)
+{
+       resv.alloc += 8 * n;
+}
+
+static inline size_t xdr_round_up(unsigned int size)
+{
+       return (size + sizeof(net_xdr_t) - 1) & ~(sizeof(net_xdr_t) - 1);
+}
+
+static inline bool reserve_opaque_inline(bool flat, unsigned int size, unsigned int max)
+{
+       return flat || (max <= 1024 && size <= 1024);
+}
+
+static inline void reserve_opaque(Resv &resv, const Opaque &data,
+                                 unsigned int max)
+{
+       size_t size = data.size();
+       size_t rs = xdr_round_up(size);
+
+       reserve(resv);
+       if (reserve_opaque_inline(resv.flat, size, max)) {
+               resv.alloc += rs;
+       } else {
+               if ((size & 3) == 0) {
+                       resv.additional += rs;
+               } else {
+                       resv.alloc += 4;
+                       resv.additional += rs - 4;
+               }
+               resv.max_ioc += 2;
+       }
+}
+
+static inline void reserve_string(Resv &resv, const std::string &s, unsigned int max)
+{
+       Opaque op;
+
+       op.buffer_size = s.size(),
+       reserve_opaque(resv, op, max);
+}
+
+static inline void reserve_opr_uuid(Resv &resv, const Uuid &uuid)
+{
+       resv.alloc += 16;
+}
+
+/*
+ * XDR encoding routines
+ */
+extern void enc_data(Enc_buffer *buf, const void *data,
+                    unsigned int len, unsigned int max);
+
+static inline void enc(Enc_buffer *z_buf, uint32_t val)
+{
+       *z_buf->enc_buf_p++ = htonl(val);
+}
+
+static inline void enc_u64(Enc_buffer *z_buf, uint64_t x)
+{
+       enc(z_buf, x >> 32);
+       enc(z_buf, x);
+}
+
+static inline void enc_string(Enc_buffer *buf, const std::string &s, unsigned int max)
+{
+       enc_data(buf, s.c_str(), s.size(), max);
+}
+
+static inline void enc_opaque(Enc_buffer *buf, const Opaque &data, unsigned int max)
+{
+       enc_data(buf, data.buffer, data.size(), max);
+}
+
+static inline void enc_block(Enc_buffer *buf, const void *p, size_t s)
+{
+       memcpy(buf->enc_buf_p, p, s);
+       buf->enc_buf_p += s / 4;
+}
+
+static inline void enc_uuid(Enc_buffer *buf, const Uuid &u)
+{
+       enc_block(buf, u.uuid, sizeof(u.uuid));
+}
+
+static inline void seal_buffer(Enc_buffer *buf)
+{
+       unsigned int len;
+
+       /* call->ioc points to the iovec being filled, so we need to advance it
+        * if that has anything in it.
+        */
+       len = (unsigned long)buf->enc_buf_p - (unsigned long)buf->msg_iov[buf->ioc].iov_base;
+       if (len) {
+               buf->msg_iov[buf->ioc].iov_len = len;
+               buf->ioc++;
+       }
+}
+
+/*
+ * XDR decoding routines
+ */
+extern uint32_t dec(Rx_queue &rxq);
+extern uint32_t dec(Rx_queue &rxq, size_t limit);
+extern void dec_string(Rx_queue &rxq, std::string &, size_t max = UINT_MAX);
+extern void dec_opaque(Rx_queue &rxq, Opaque &, size_t max = UINT_MAX);
+extern void *dec_blob(Rx_queue &rxq, size_t size);
+
+static inline uint64_t dec_u64(Rx_queue &rxq)
+{
+       uint64_t x;
+       x  = (uint64_t)dec(rxq) << 32;
+       x |= (uint64_t)dec(rxq);
+       return x;
+}
+
+/*
+ * Error handling.
+ */
+struct nomem : std::exception {
+};
+
+class syserror : public std::system_error {
+protected:
+       syserror() {}
+public:
+       syserror(const char *s)
+               : system_error(std::error_code(errno, std::generic_category()), s)
+               {
+               }
+       syserror(const std::string &s)
+               : system_error(std::error_code(errno, std::generic_category()), s)
+               {
+               }
+       syserror(int err, const char *s)
+               : system_error(std::error_code(err, std::generic_category()), s)
+               {
+               }
+       syserror(int err, const std::string &s)
+               : system_error(std::error_code(err, std::generic_category()), s)
+               {
+               }
+       syserror(int err)
+               : system_error(std::error_code(err, std::generic_category()))
+               {
+               }
+};
+
+class network_error : public syserror {
+public:
+       __attribute__((format(printf,2,3)))
+       network_error(const char *fmt, ...)
+               {
+               }
+       network_error(int err)
+               : syserror(err)
+               {
+               }
+};
+
+class rx_abort : public std::runtime_error {
+protected:
+       int             abort_code;
+       const char      *abort_name;
+       const char      *op_name;
+
+       rx_abort(int abort, const char *name, const char *desc, const char *op);
+public:
+       int get_abort_code() const { return abort_code; }
+};
+
+class unknown_abort : public rx_abort {
+public:
+       unknown_abort(int abort_code, const char *op = NULL)
+               : rx_abort(abort_code, "UnknownAbort", "", op) {}
+};
+
+struct AbortRx : rx_abort {
+       AbortRx(int ac, const char *sym, const char *desc, const char *op)
+               : rx_abort(ac, sym, desc, op) {}
+};
+struct AbortRX_CALL_DEAD : AbortRx {
+       AbortRX_CALL_DEAD(const char *op = NULL)
+               : AbortRx(RX_CALL_DEAD, "RX_CALL_DEAD", "", op) {}
+};
+struct AbortRX_INVALID_OPERATION : AbortRx {
+       AbortRX_INVALID_OPERATION(const char *op = NULL)
+               : AbortRx(RX_INVALID_OPERATION, "RX_INVALID_OPERATION", "", op) {}
+};
+struct AbortRX_CALL_TIMEOUT : AbortRx {
+       AbortRX_CALL_TIMEOUT(const char *op = NULL)
+               : AbortRx(RX_CALL_TIMEOUT, "RX_CALL_TIMEOUT", "", op) {}
+};
+struct AbortRX_EOF : AbortRx {
+       AbortRX_EOF(const char *op = NULL)
+               : AbortRx(RX_EOF, "RX_EOF", "", op) {}
+};
+struct AbortRX_PROTOCOL_ERROR : AbortRx {
+       AbortRX_PROTOCOL_ERROR(const char *op = NULL)
+               : AbortRx(RX_PROTOCOL_ERROR, "RX_PROTOCOL_ERROR", "", op) {}
+};
+struct AbortRX_USER_ABORT : AbortRx {
+       AbortRX_USER_ABORT(const char *op = NULL)
+               : AbortRx(RX_USER_ABORT, "RX_USER_ABORT", "", op) {}
+};
+struct AbortRX_ADDRINUSE : AbortRx {
+       AbortRX_ADDRINUSE(const char *op = NULL)
+               : AbortRx(RX_ADDRINUSE, "RX_ADDRINUSE", "", op) {}
+};
+struct AbortRX_DEBUGI_BADTYPE : AbortRx {
+       AbortRX_DEBUGI_BADTYPE(const char *op = NULL)
+               : AbortRx(RX_DEBUGI_BADTYPE, "RX_DEBUGI_BADTYPE", "", op) {}
+};
+
+struct AbortRXGEN_CC_MARSHAL   : AbortRx {
+       AbortRXGEN_CC_MARSHAL(const char *op = NULL)
+               : AbortRx(RXGEN_CC_MARSHAL, "RXGEN_CC_MARSHAL", "", op) {}
+};
+struct AbortRXGEN_CC_UNMARSHAL : AbortRx {
+       AbortRXGEN_CC_UNMARSHAL(const char *op = NULL)
+               : AbortRx(RXGEN_CC_UNMARSHAL, "RXGEN_CC_UNMARSHAL", "", op) {}
+};
+struct AbortRXGEN_SS_MARSHAL   : AbortRx {
+       AbortRXGEN_SS_MARSHAL(const char *op = NULL)
+               : AbortRx(RXGEN_SS_MARSHAL, "RXGEN_SS_MARSHAL", "", op) {}
+};
+struct AbortRXGEN_SS_UNMARSHAL : AbortRx {
+       AbortRXGEN_SS_UNMARSHAL(const char *op = NULL)
+               : AbortRx(RXGEN_SS_UNMARSHAL, "RXGEN_SS_UNMARSHAL", "", op) {}
+};
+struct AbortRXGEN_DECODE               : AbortRx {
+       AbortRXGEN_DECODE(const char *op = NULL)
+               : AbortRx(RXGEN_DECODE, "RXGEN_DECODE", "", op) {}
+};
+struct AbortRXGEN_OPCODE               : AbortRx {
+       AbortRXGEN_OPCODE(const char *op = NULL)
+               : AbortRx(RXGEN_OPCODE, "RXGEN_OPCODE", "", op) {}
+};
+struct AbortRXGEN_SS_XDRFREE   : AbortRx {
+       AbortRXGEN_SS_XDRFREE(const char *op = NULL)
+               : AbortRx(RXGEN_SS_XDRFREE, "RXGEN_SS_XDRFREE", "", op) {}
+};
+struct AbortRXGEN_CC_XDRFREE   : AbortRx {
+       AbortRXGEN_CC_XDRFREE(const char *op = NULL)
+               : AbortRx(RXGEN_CC_XDRFREE, "RXGEN_CC_XDRFREE", "", op) {}
+};
+
+extern void throw_abort(int abort_code, const char *op=NULL);
+
+} /* end namespace rxrpc */
+
+#endif /* RXRPC_HH */
diff --git a/lib/rxrpc_core.C b/lib/rxrpc_core.C
new file mode 100644 (file)
index 0000000..9b0b572
--- /dev/null
@@ -0,0 +1,181 @@
+/* rxrpc core
+ *
+ * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#include <iostream>
+#include <cstdio>
+#include <cstdlib>
+#include <cstring>
+#include <cerrno>
+#include "rxrpc.H"
+#include "transport.H"
+
+using rxrpc::ref;
+
+bool rxrpc::debug_buffers;             /* Debug buffer handling */
+bool rxrpc::debug_core;                        /* Debug core management */
+bool rxrpc::debug_data;                        /* Dump data */
+bool rxrpc::debug_ops;                 /* Debug operations */
+bool rxrpc::debug_transport;           /* Debug transport */
+bool rxrpc::debug_xdr;                 /* Debug XDR encoding/decoding */
+
+#define debug(fmt, ...) \
+       do { if (rxrpc::debug_core) printf("CORE: " fmt, ## __VA_ARGS__); } while (0)
+#define debug_op(fmt, ...) \
+       do { if (rxrpc::debug_ops) printf("OP: " fmt, ## __VA_ARGS__); } while (0)
+
+rxrpc::refcount::~refcount() {}
+rxrpc::Security::~Security() {}
+rxrpc::Transport::~Transport() {}
+
+static ref<rxrpc::Transport> rxrpc_transport;
+
+void rxrpc::find_transport(void)
+{
+       if (!rxrpc_transport)
+               rxrpc_transport = create_af_rxrpc_transport();
+}
+
+/*
+ * Set up a new transport
+ */
+ref<rxrpc::Endpoint> rxrpc::new_local_endpoint(const struct sockaddr_rxrpc *local_addr,
+                                              rxrpc::Security *security)
+{
+       if (!rxrpc_transport)
+               throw std::invalid_argument("No Rx transport loaded");
+
+       return rxrpc_transport->new_local_endpoint(local_addr, security);
+}
+
+/*
+ * Destroy a local endpoint.
+ */
+rxrpc::Endpoint::~Endpoint()
+{
+}
+
+/*
+ * Set up a new security context.
+ */
+ref<rxrpc::Security> rxrpc::new_security(const std::string &key_name,
+                                        enum rxrpc::security_auth_level level)
+{
+       if (level < security_unset ||
+           level > security_encrypt)
+               throw std::invalid_argument("Invalid security level");
+
+       return rxrpc_transport->new_security(key_name, level);
+}
+
+/*
+ * Construct a server call..
+ */
+rxrpc::Call::Call()
+{
+       memset(&peer, 0, sizeof(peer));
+       peer_len                = 0;
+       service_id              = 0;
+       state                   = call_cl_not_started;
+       completed               = call_incomplete;
+       abort_code              = 0;
+       error                   = 0;
+       client_side             = false;
+       split                   = false;
+       exclusive               = false;
+       upgrade_service         = false;
+       have_rx_timestamp       = false;
+       memset(&rx_timestamp, 0, sizeof(rx_timestamp));
+       completion_func         = NULL;
+       throw_abort_func        = rxrpc::throw_abort;
+       total_tx_size           = 0;
+       encode_error            = 0;
+       dec_func                = NULL;
+}
+
+/*
+ * Construct a client call.
+ */
+rxrpc::Call::Call(struct Call_params *params, const char *op_name, size_t size)
+       : Rx_queue(op_name)
+{
+       endpoint                = params->endpoint;
+       peer                    = params->peer;
+       peer_len                = sizeof(peer);
+       service_id              = params->peer.srx_service;;
+       state                   = call_cl_not_started;
+       completed               = call_incomplete;
+       abort_code              = 0;
+       error                   = 0;
+       client_side             = true;
+       split                   = false;
+       exclusive               = params->exclusive;
+       upgrade_service         = params->upgrade_service;
+       have_rx_timestamp       = false;
+       memset(&rx_timestamp, 0, sizeof(rx_timestamp));
+       completion_func         = NULL;
+       throw_abort_func        = rxrpc::throw_abort;
+       total_tx_size           = size;
+       encode_error            = 0;
+       dec_func                = rxrpc::receive_reply;
+}
+
+/*
+ * Set up a client call and prepare for encoding.
+ */
+ref<rxrpc::Call> rxrpc::make_call(struct Call_params *params,
+                                 const char *op_name,
+                                 Enc_buffer *buf)
+{
+       ref<rxrpc::Call> call;
+
+       if (params->peer_len < 5 * 2 ||
+           params->peer_len > sizeof(call->peer))
+               throw std::invalid_argument("Peer address too small");
+
+       if (params->peer.srx_family != AF_RXRPC)
+               throw std::invalid_argument("Peer address family unsupported");
+
+       call = params->endpoint->new_client_call(params, op_name, buf->size);
+
+       debug_op("Send %s\n", op_name);
+       return call;
+}
+
+/*
+ * Call destructor
+ */
+rxrpc::Call::~Call()
+{
+}
+
+/*
+ * Reply decoder.
+ */
+void rxrpc::receive_reply(Call *call)
+{
+       if (call->state == rxrpc::Call::call_completed &&
+           call->completed == rxrpc::Call::call_success)
+               debug("Reply received\n");
+}
+
+/*
+ * End of reception of request.  There should be no more data in the buffers.
+ */
+void rxrpc::Call::request_received()
+{
+       switch (state) {
+       case call_sv_receiving_params:
+       default:
+               fprintf(stderr, "rxrpc::request_received() called in state %u\n",
+                       state);
+               break;
+       }
+}
diff --git a/lib/rxrpc_xdr.C b/lib/rxrpc_xdr.C
new file mode 100644 (file)
index 0000000..00a272f
--- /dev/null
@@ -0,0 +1,325 @@
+/* XDR coding and decoding.
+ *
+ * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#include <iostream>
+#include <fmt/core.h>
+#include "rxrpc.H"
+
+using rxrpc::ref;
+
+namespace rxrpc {
+
+static inline size_t calc_padding(size_t x)
+{
+       return (4 - (x & 3)) & 3;
+}
+
+static void enc_advance_iov_by_2(Enc_buffer *);
+static void dec_string_copy(void *, size_t, const void *, size_t);
+static void dec_ignore_padding(void *, size_t, const void *, size_t);
+}
+
+#define debug(fmt, ...) \
+       do { if (rxrpc::debug_xdr) printf("XDR: " fmt, ## __VA_ARGS__); } while (0)
+#define debug_buffer(fmt, ...) \
+       do { if (rxrpc::debug_buffers) printf("BUF: " fmt, ## __VA_ARGS__); } while (0)
+
+#define UAENOMEM 0x2f6df0b
+
+/*
+ * Set up an encoded buffer.
+ */
+ref<rxrpc::Enc_buffer> rxrpc::alloc_enc_buffer(Resv &resv)
+{
+       ref<Enc_buffer> buf;
+       size_t alloc;
+
+       if (resv.max_ioc == 0)
+               abort();
+
+       buf = new Enc_buffer;
+       buf->msg_iov = new iovec[resv.max_ioc];
+
+       alloc = (resv.alloc + 3) / 4;
+       buf->enc_buf = new net_xdr_t[alloc];
+
+       buf->flat = resv.flat;
+       buf->size = resv.alloc + resv.additional;
+       buf->enc_buf_p = buf->enc_buf;
+       buf->enc_buf_end = buf->enc_buf + alloc;
+       buf->msg_iov[0].iov_base = buf->enc_buf_p;
+       debug_buffer("Tx ALLOC s=%zx a=%zx ioc=%u\n", buf->size, resv.alloc, resv.max_ioc);
+       return buf;
+}
+
+rxrpc::Enc_buffer::~Enc_buffer()
+{
+       delete [] msg_iov;
+       delete [] enc_buf;
+}
+
+/*
+ * Advance the iov array by two slots, leaving a slot for a blob.
+ */
+static void rxrpc::enc_advance_iov_by_2(Enc_buffer *buf)
+{
+       unsigned int ioc = buf->ioc;
+
+       /* Note that iov_base may not be 4-bytes aligned if it starts with blob
+        * padding.
+        */
+       buf->msg_iov[ioc].iov_len =
+               (unsigned long)buf->enc_buf_p - (unsigned long)buf->msg_iov[ioc].iov_base;
+
+       ioc += 2;
+       buf->msg_iov[ioc].iov_base = buf->enc_buf_p;
+       buf->ioc = ioc;
+}
+
+/*
+ * Encode an opaque blob to XDR.  We copy in the blob if it's small, otherwise
+ * we assign it its own iov slot.
+ */
+void rxrpc::enc_data(Enc_buffer *buf,
+                    const void *blob, unsigned int size, unsigned int max)
+{
+       unsigned int ioc;
+       size_t padsize = calc_padding(size);
+       size_t total = size + padsize;
+
+       enc(buf, size);
+
+       if (size == 0) {
+               /* Nothing to store */
+       } else if (reserve_opaque_inline(buf->flat, size, max)) {
+               if (size & 3)
+                       buf->enc_buf_p[size >> 2] = 0; /* Clear padding */
+               memcpy(buf->enc_buf_p, blob, size);
+               buf->enc_buf_p += total / sizeof(net_xdr_t);
+       } else {
+               enc_advance_iov_by_2(buf);
+               ioc = buf->ioc;
+               buf->msg_iov[ioc - 1].iov_len = size;
+               buf->msg_iov[ioc - 1].iov_base = (void *)blob;
+               if (padsize > 0) {
+                       enc(buf, 0);
+                       buf->msg_iov[ioc].iov_base += 4 - padsize;
+                       buf->msg_iov[ioc].iov_len += padsize;
+               }
+       }
+}
+
+void rxrpc::Call::dec_underrun(size_t size)
+{
+       if (client_side)
+               throw std::runtime_error(
+                       fmt::format("rxrpc XDR decoder underrun ({:x}/{:x}) on {}",
+                                   dec_amount, size, op_name));
+       else
+               throw AbortRXGEN_SS_UNMARSHAL(op_name);
+}
+
+rxrpc::Rx_queue::Rx_queue(const char *opname) : op_name(opname)
+{
+       dec_index = 0;
+       dec_amount = 0;
+}
+
+rxrpc::Rx_queue::~Rx_queue()
+{
+}
+
+
+void rxrpc::Rx_queue::overrun()
+{
+#if 0
+       if (state == call_completed &&
+           completed != call_success)
+               return;
+#endif
+       printf("Partial fail\n");
+}
+
+/*
+ * Default copier for rxrpc::Call::read.
+ */
+void rxrpc::Rx_queue::read_copy(void *buffer, size_t offset, const void *data, size_t len)
+{
+       memcpy(buffer + offset, data, len);
+}
+
+/*
+ * Read data from the receive buffers.
+ */
+void rxrpc::Rx_queue::read(void *buffer, size_t buffer_size,
+                          void (*copy)(void *buffer, size_t offset, const void *data, size_t len))
+{
+       unsigned int seg, i;
+       size_t offset = 0;
+
+       debug("READ %zu/%zu\n", buffer_size, dec_amount);
+
+       if (buffer_size > dec_amount)
+               return overrun();
+
+       while (offset < buffer_size && !dec_buffers.empty()) {
+               Rx_buffer &b = dec_buffers.front();
+
+               seg = b.len - b.used;
+               if (seg > buffer_size - offset)
+                       seg = buffer_size - offset;
+
+               debug("B[%u] %zu/%zu from %u/%zu\n",
+                     b.index, offset, buffer_size, seg, dec_amount);
+
+               (*copy)(buffer, offset, b.buf + b.used, seg);
+               offset += seg;
+               b.used += seg;
+               dec_amount -= seg;
+
+               if (b.used == b.len) {
+                       dec_buffers.pop_front();
+               } else {
+                       if (offset != buffer_size)
+                               ::abort();
+               }
+       }
+
+       if (offset < buffer_size) {
+               fprintf(stderr, "Inconsistent amount of buffered rx data\n");
+               ::abort();
+       }
+
+       if (dec_buffers.empty() && dec_amount != 0) {
+               fprintf(stderr, "dec_amount should be zero\n");
+               ::abort();
+       }
+
+       if (debug_xdr) {
+               seg = buffer_size <= 32 ? buffer_size : 32;
+               debug("");
+               for (i = 0; i < seg; i++)
+                       printf("%02x", ((unsigned char *)buffer)[i]);
+               printf("\n");
+       }
+}
+
+/*
+ * Decode a 32-bit integer.
+ */
+uint32_t rxrpc::dec(Rx_queue &rxq)
+{
+       uint32_t x;
+
+       rxq.read(&x, sizeof(x));
+       return ntohl(x);
+}
+
+/*
+ * Decode a 32-bit integer and check limit.
+ */
+uint32_t rxrpc::dec(Rx_queue &rxq, size_t limit)
+{
+       uint32_t x;
+
+       x = dec(rxq);
+       if (x > limit) {
+               if (rxq.client_side) {
+                       throw std::runtime_error(
+                               fmt::format("XDR: Decoded value exceeded limit ({:x}/{:x}) on {}",
+                                           x, limit, rxq.op_name));
+               } else {
+                       throw AbortRXGEN_SS_UNMARSHAL(rxq.op_name);
+               }
+       }
+
+       return x;
+}
+
+static void rxrpc::dec_string_copy(void *buffer, size_t offset, const void *data, size_t len)
+{
+       std::string *s = (std::string *)buffer;
+
+       s->insert(offset, (const char *)data, len);
+}
+
+static void rxrpc::dec_ignore_padding(void *buffer, size_t offset, const void *data, size_t len)
+{
+}
+
+/*
+ * Decode a string.
+ */
+void rxrpc::dec_string(Rx_queue &rxq, std::string &s, size_t max)
+{
+       size_t len;
+
+       len = dec(rxq, max);
+       s.reserve(len);
+       rxq.read(&s, len, dec_string_copy);
+       len &= 3;
+       if (len)
+               rxq.read(NULL, 4 - len, dec_ignore_padding);
+}
+
+/*
+ * Decode an opaque
+ */
+void rxrpc::dec_opaque(Rx_queue &rxq, Opaque &data, size_t max)
+{
+       size_t len;
+
+       len = dec(rxq, max);
+       data.reserve(len);
+       rxq.read(data.buffer, len);
+       data.buffer_size = len;
+       len &= 3;
+       if (len)
+               rxq.read(NULL, 4 - len, dec_ignore_padding);
+}
+
+/*
+ * Construct an abort exception class.
+ */
+rxrpc::rx_abort::rx_abort(int abort, const char *name, const char *desc, const char *op)
+       : std::runtime_error(fmt::format("RPC {} aborted with {}", op, name)),
+         abort_code(abort), abort_name(name), op_name(op)
+{
+       if (debug_ops)
+               printf("OP: Rx ABORT: %d %s\n", abort_code, abort_name);
+}
+
+/*
+ * Throw an exception appropriate to the abort code.
+ */
+void rxrpc::throw_abort(int abort_code, const char *op)
+{
+       switch (abort_code) {
+       case RX_CALL_DEAD:              throw AbortRX_CALL_DEAD(op);
+       case RX_INVALID_OPERATION:      throw AbortRX_INVALID_OPERATION(op);
+       case RX_CALL_TIMEOUT:           throw AbortRX_CALL_TIMEOUT(op);
+       case RX_EOF:                    throw AbortRX_EOF(op);
+       case RX_PROTOCOL_ERROR:         throw AbortRX_PROTOCOL_ERROR(op);
+       case RX_USER_ABORT:             throw AbortRX_USER_ABORT(op);
+       case RX_ADDRINUSE:              throw AbortRX_ADDRINUSE(op);
+       case RX_DEBUGI_BADTYPE:         throw AbortRX_DEBUGI_BADTYPE(op);
+       case RXGEN_CC_MARSHAL:          throw AbortRXGEN_CC_MARSHAL(op);
+       case RXGEN_CC_UNMARSHAL:        throw AbortRXGEN_CC_UNMARSHAL(op);
+       case RXGEN_SS_MARSHAL:          throw AbortRXGEN_SS_MARSHAL(op);
+       case RXGEN_SS_UNMARSHAL:        throw AbortRXGEN_SS_UNMARSHAL(op);
+       case RXGEN_DECODE:              throw AbortRXGEN_DECODE(op);
+       case RXGEN_OPCODE:              throw AbortRXGEN_OPCODE(op);
+       case RXGEN_SS_XDRFREE:          throw AbortRXGEN_SS_XDRFREE(op);
+       case RXGEN_CC_XDRFREE:          throw AbortRXGEN_CC_XDRFREE(op);
+       default:
+               throw unknown_abort(abort_code, op);
+       }
+}
diff --git a/lib/transport.H b/lib/transport.H
new file mode 100644 (file)
index 0000000..7887261
--- /dev/null
@@ -0,0 +1,75 @@
+/* Access points into the dynamically loaded Rx transport module
+ *
+ * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#ifndef _RXRPC_TRANSPORT_H
+#define _RXRPC_TRANSPORT_H
+
+#include <stdint.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include "rxrpc.H"
+
+namespace rxrpc {
+
+enum recv_condition {
+       recv_more,
+       recv_final_ack,
+       recv_complete,
+       recv_complete_now_send,
+       recv_abort,
+       recv_net_error,
+       recv_local_error,
+       recv_busy,
+};
+
+#if 0
+typedef void (*incoming_call_func_t)(struct service *service,
+                                    void *caller_data,
+                                    const struct sockaddr_rxrpc *peer,
+                                    socklen_t *peerlen);
+#endif
+
+class Transport : public refcount {
+public:
+       virtual std::string name() const = 0;
+
+       virtual ref<Endpoint> new_local_endpoint(const struct sockaddr_rxrpc *local_addr,
+                                                Security *security) = 0;
+
+       virtual ref<Security> new_security(const std::string &key_name,
+                                          enum rxrpc::security_auth_level level) = 0;
+
+#if 0
+       virtual int new_service(Endpoint *&endpoint,
+                               Service *service,
+                               incoming_call_func_t new_call_func,
+                               void *caller_data,
+                               uint16_t local_port,
+                               uint16_t local_service,
+                               Security *permits) = 0;
+
+       virtual void shutdown_service(Service *service) = 0;
+       
+       virtual int accept_call(Call *call,
+                               Service *service,
+                               void *caller_data,
+                               struct sockaddr_rxrpc *_sa,
+                               socklen_t *_salen) = 0;
+#endif
+
+       virtual ~Transport();
+};
+
+} /* end namespace rxrpc */
+
+extern rxrpc::ref<rxrpc::Transport> create_af_rxrpc_transport();
+
+#endif /* _RXRPC_TRANSPORT_H */