]> www.infradead.org Git - users/dhowells/kafs-utils.git/commitdiff
Implement basic I/O on top of an AF_RXRPC socket
authorDavid Howells <dhowells@redhat.com>
Mon, 30 Sep 2019 09:13:00 +0000 (10:13 +0100)
committerDavid Howells <dhowells@redhat.com>
Fri, 5 May 2023 09:05:25 +0000 (10:05 +0100)
Implement basic I/O on top of an AF_RXRPC socket so that the codecs
generated by rxgen.py will work.

Signed-off-by: David Howells <dhowells@redhat.com>
.gitignore
lib/Makefile
lib/af_rxrpc.c [new file with mode: 0644]
lib/list.h [new file with mode: 0644]
lib/rxrpc.h
lib/rxrpc_core.c [new file with mode: 0644]
lib/rxrpc_xdr.c [new file with mode: 0644]
lib/transport.h [new file with mode: 0644]

index d7c40eee1e5065ac94ca2b48b1c5529584320952..a6a643047c8d07578d07692ccdea4c81b218bcbc 100644 (file)
@@ -1,4 +1,8 @@
 *~
+*.o
+.*.o.d
+*.so
 lib/afs_xg.*
+lib/libkafs_utils.*
 rxgen/__pycache__/
 rxgen/parsetab.py
index a904339fa24e7d8f73646a9e6e3fb2c437bd2c20..e50daab45a86380271eb2ff4b96ac2fdecf653fb 100644 (file)
@@ -2,11 +2,26 @@ RXGEN := ../rxgen/rxgen.py
 
 RPC_HEADERS    := $(wildcard ../rpc-api/*.h)
 RPC_DEFS       := $(filter-out %test.xg, $(sort $(wildcard ../rpc-api/*.xg)))
-CFLAGS         := -g -Wall -Wformat -fpic
+CFLAGS         := -g -Wall -Wformat -fpic -O2
 
-#RPC_DEFS      := ../rpc-api/test.xg
+LIB_SRCS       := afs_xg.c rxrpc_xdr.c af_rxrpc.c rxrpc_core.c
+LIB_OBJS       := $(patsubst %.c,%.o,$(LIB_SRCS))
 
-afs_xg.o: afs_xg.c afs_xg.h rxrpc.h
+%.o: %.c
+       $(CC) $(CPPFLAGS) -MMD -MF .$@.d $(CFLAGS) -o $@ -c $<
+
+all: libkafs_utils.a
+
+libkafs_utils.a: $(LIB_OBJS)
+       $(AR) rcs -o $@ $(LIB_OBJS)
 
 afs_xg.c afs_xg.h: $(RPC_HEADERS) $(RPC_DEFS) $(wildcard ../rxgen/*.py) Makefile
        $(RXGEN) $(RPC_HEADERS) $(RPC_DEFS)
+
+DEPS           := $(wildcard .*.o.d)
+ifneq ($(DEPS),)
+include $(DEPS)
+endif
+
+clean:
+       $(RM) *~ *.o afs_xg.[ch] *.so *.a $(DEPS)
diff --git a/lib/af_rxrpc.c b/lib/af_rxrpc.c
new file mode 100644 (file)
index 0000000..814e2b0
--- /dev/null
@@ -0,0 +1,935 @@
+/* AF_RXRPC driver
+ *
+ * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#define _XOPEN_SOURCE
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <poll.h>
+#include <errno.h>
+#include <limits.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <linux/rxrpc.h>
+#include "transport.h"
+
+#define debug(fmt, ...) \
+       do { if (rxrpc_debug_transport) printf("AFRX: "fmt, ## __VA_ARGS__); } while (0)
+#define debug_buffer(fmt, ...) \
+       do { if (rxrpc_debug_buffers) printf("BUF: "fmt, ## __VA_ARGS__); } while (0)
+
+#define RXRPC_SELECT_CALL_FOR_RECV     7       /* Specify the call for recvmsg, SIOCINQ, etc. */
+#define RXRPC_SELECT_CALL_FOR_SEND     8       /* Specify the call for splice, SIOCOUTQ, etc. */
+
+#define RXRPC_SET_SECURITY_KEY 14
+#define RXRPC_SET_SECURITY_LEVEL 15
+
+/*
+ * Add a call ID to a control message sequence.
+ */
+#define RXRPC_ADD_CALLID(control, ctrllen, id)                         \
+do {                                                                   \
+       void *__buffer = (control);                                     \
+       unsigned long *__data;                                          \
+       struct cmsghdr *__cmsg;                                         \
+       __cmsg = __buffer + (ctrllen);                                  \
+       __cmsg->cmsg_len        = CMSG_LEN(sizeof(*__data));            \
+       __cmsg->cmsg_level      = SOL_RXRPC;                            \
+       __cmsg->cmsg_type       = RXRPC_USER_CALL_ID;                   \
+       __data = (void *)CMSG_DATA(__cmsg);                             \
+       *__data = (id);                                                 \
+       (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len);                      \
+} while (0)
+
+/*
+ * Add an abort instruction to a control message sequence.
+ */
+#define RXRPC_ADD_ABORT(control, ctrllen, abort_code)                  \
+do {                                                                   \
+       void *__buffer = (control);                                     \
+       unsigned int *__data;                                           \
+       struct cmsghdr *__cmsg;                                         \
+       __cmsg = __buffer + (ctrllen);                                  \
+       __cmsg->cmsg_len        = CMSG_LEN(sizeof(*__data));            \
+       __cmsg->cmsg_level      = SOL_RXRPC;                            \
+       __cmsg->cmsg_type       = RXRPC_ABORT;                          \
+       __data = (void *)CMSG_DATA(__cmsg);                             \
+       *__data = (abort_code);                                         \
+       (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len);                      \
+} while (0)
+
+/*
+ * Add a security level instruction to a control message sequence.
+ */
+#define RXRPC_ADD_SECURITY_LEVEL(control, ctrllen, sec_level)          \
+do {                                                                   \
+       void *__buffer = (control);                                     \
+       unsigned int *__data;                                           \
+       struct cmsghdr *__cmsg;                                         \
+       __cmsg = __buffer + (ctrllen);                                  \
+       __cmsg->cmsg_len        = CMSG_LEN(sizeof(*__data));            \
+       __cmsg->cmsg_level      = SOL_RXRPC;                            \
+       __cmsg->cmsg_type       = RXRPC_SET_SECURITY_LEVEL;             \
+       __data = (void *)CMSG_DATA(__cmsg);                             \
+       *__data = (sec_level);                                          \
+       (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len);                      \
+} while (0)
+
+/*
+ * Add a total transmit length instruction to a control message sequence.
+ */
+#define RXRPC_ADD_TX_LENGTH(control, ctrllen, length)                  \
+do {                                                                   \
+       void *__buffer = (control);                                     \
+       unsigned long long __data = (length);                           \
+       struct cmsghdr *__cmsg;                                         \
+       __cmsg = __buffer + (ctrllen);                                  \
+       __cmsg->cmsg_len        = CMSG_LEN(sizeof(__data));             \
+       __cmsg->cmsg_level      = SOL_RXRPC;                            \
+       __cmsg->cmsg_type       = RXRPC_TX_LENGTH;                      \
+       memcpy(CMSG_DATA(__cmsg), &__data, sizeof(__data));             \
+       (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len);                      \
+} while (0)
+
+/*
+ * Add a miscellaneous option to a control message sequence.
+ */
+#define RXRPC_ADD_MISC_OPTION(control, ctrllen, opt)                   \
+do {                                                                   \
+       void *__buffer = (control);                                     \
+       struct cmsghdr *__cmsg;                                         \
+       __cmsg = __buffer + (ctrllen);                                  \
+       __cmsg->cmsg_len        = CMSG_LEN(0);                          \
+       __cmsg->cmsg_level      = SOL_RXRPC;                            \
+       __cmsg->cmsg_type       = opt;                                  \
+       (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len);                      \
+} while (0)
+
+struct rxrpc_transport_private {
+       struct sockaddr_rxrpc   local;
+       int                     fd;
+       unsigned int            sec_level;
+
+       /* Message reception infrastructure */
+       struct list_head        rx_spare_buffers;       /* Spare bufferage for receive */
+       unsigned int            rx_nr_buffers;          /* Number of buffers on the list */
+#define CONN_MSGS 3
+#define CONN_CMSG_SIZE 128
+#define CONN_NR_IOV 4
+       struct mmsghdr          rx_msgs[CONN_MSGS];     /* Message headers */
+       struct sockaddr_rxrpc   rx_peer[CONN_MSGS];     /* Mass peer address bufferage */
+       struct iovec            rx_iov[CONN_MSGS * CONN_NR_IOV]; /* Mass data bufferage */
+       unsigned char           rx_cmsg[CONN_MSGS * CONN_CMSG_SIZE]; /* Mass control message bufferage */
+};
+
+struct rxrpc_security_private {
+       unsigned int            min_sec_level;  /* RXRPC_SECURITY_* value */
+};
+
+struct rxrpc_call_private {
+       bool                    known_to_kernel;
+};
+
+/*
+ * dump the control messages
+ */
+static __attribute__((unused))
+void dump_cmsg(const char *dir, struct msghdr *msg)
+{
+       struct timespec tv;
+       struct cmsghdr *cmsg;
+       unsigned long long tx_len;
+       unsigned long user_id;
+       unsigned char *p;
+       int abort_code;
+       int n;
+
+       for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
+               n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg));
+               p = CMSG_DATA(cmsg);
+
+               debug("%s CMSG: %zu: ", dir, cmsg->cmsg_len);
+
+               switch (cmsg->cmsg_level) {
+               case SOL_SOCKET:
+                       switch (cmsg->cmsg_type) {
+                       case SO_TIMESTAMPNS_NEW:
+                               printf("SO_TIMESTAMPNS_NEW: ");
+                               if (n != sizeof(tv))
+                                       goto dump_data;
+                               memcpy(&tv, p, sizeof(tv));
+                               printf("%lu.%lu\n", tv.tv_sec, tv.tv_nsec);
+                               continue;
+                       default:
+                               printf("SOCKET_%d", cmsg->cmsg_type);
+                               break;
+                       }
+                       break;
+
+               case SOL_RXRPC:
+                       switch (cmsg->cmsg_type) {
+                       case RXRPC_USER_CALL_ID:
+                               printf("RXRPC_USER_CALL_ID: ");
+                               if (n != sizeof(user_id))
+                                       goto dump_data;
+                               memcpy(&user_id, p, sizeof(user_id));
+                               printf("%lx\n", user_id);
+                               continue;
+
+                       case RXRPC_ABORT:
+                               printf("RXRPC_ABORT: ");
+                               if (n != sizeof(abort_code))
+                                       goto dump_data;
+                               memcpy(&abort_code, p, sizeof(abort_code));
+                               printf("%d\n", abort_code);
+                               continue;
+
+                       case RXRPC_ACK:
+                               printf("RXRPC_ACK");
+                               if (n != 0)
+                                       goto dump_data_colon;
+                               goto print_nl;
+
+                       case RXRPC_NET_ERROR:
+                               printf("RXRPC_NET_ERROR: ");
+                               if (n != sizeof(abort_code))
+                                       goto dump_data;
+                               memcpy(&abort_code, p, sizeof(abort_code));
+                               printf("%s\n", strerror(abort_code));
+                               continue;
+
+                       case RXRPC_BUSY:
+                               printf("RXRPC_BUSY");
+                               if (n != 0)
+                                       goto dump_data_colon;
+                               goto print_nl;
+
+                       case RXRPC_LOCAL_ERROR:
+                               printf("RXRPC_LOCAL_ERROR: ");
+                               if (n != sizeof(abort_code))
+                                       goto dump_data;
+                               memcpy(&abort_code, p, sizeof(abort_code));
+                               printf("%s\n", strerror(abort_code));
+                               continue;
+
+                       case RXRPC_UPGRADE_SERVICE:
+                               printf("RXRPC_UPGRADE_SERVICE\n");
+                               continue;
+                       case RXRPC_TX_LENGTH:
+                               printf("RXRPC_TX_LENGTH: ");
+                               if (n != sizeof(tx_len))
+                                       goto dump_data;
+                               memcpy(&tx_len, p, sizeof(tx_len));
+                               printf("%llu\n", tx_len);
+                               continue;
+                       case RXRPC_SET_CALL_TIMEOUT:
+                               printf("RXRPC_SET_CALL_TIMEOUT\n");
+                               continue;
+                       default:
+                               printf("RXRPC_%d", cmsg->cmsg_type);
+                               break;
+                       }
+                       break;
+
+               default:
+                       printf("l=%d t=%d", cmsg->cmsg_level, cmsg->cmsg_type);
+                       break;
+               }
+
+
+       dump_data_colon:
+               printf(": ");
+       dump_data:
+               printf("{");
+               for (; n > 0; n--, p++)
+                       printf("%02x", *p);
+
+       print_nl:
+               printf("}\n");
+       }
+}
+
+/*
+ * Initialise the transport module
+ */
+static bool af_rxrpc_create_local_endpoint(struct rxrpc_local_endpoint *endpoint)
+{
+       struct rxrpc_transport_private *trans;
+       int opt, i;
+
+       if (endpoint->addr_len > 0) {
+               if (endpoint->addr_len > sizeof(trans->local.transport)) {
+                       errno = EINVAL;
+                       return false;
+               }
+
+               switch (endpoint->addr.transport.family) {
+               case AF_INET:
+               case AF_INET6:
+                       break;
+               default:
+                       fprintf(stderr, "Unsupported transport type\n");
+                       errno = EPROTOTYPE;
+                       return false;
+               }
+       }
+
+       trans = calloc(1, sizeof(*trans));
+       if (!trans)
+               return false;
+
+       init_list_head(&trans->rx_spare_buffers);
+
+       /* Set up the message descriptors */
+       for (i = 0; i < CONN_MSGS * CONN_NR_IOV; i++)
+               trans->rx_iov[i].iov_len = RXGEN_BUFFER_SIZE;
+
+       for (i = 0; i < CONN_MSGS; i++) {
+               struct mmsghdr *m = &trans->rx_msgs[i];
+
+               m->msg_hdr.msg_iov      = trans->rx_iov + i * CONN_NR_IOV;
+               m->msg_hdr.msg_iovlen   = CONN_NR_IOV;
+               m->msg_hdr.msg_name     = &trans->rx_peer[i];
+               m->msg_hdr.msg_namelen  = sizeof(trans->rx_peer[i]);
+               m->msg_hdr.msg_control  = trans->rx_cmsg;
+               m->msg_hdr.msg_controllen = CONN_CMSG_SIZE;
+               m->msg_hdr.msg_flags    = 0;
+       }
+
+       /* Open up a socket for talking to the AF_RXRPC module */
+       trans->fd = socket(AF_RXRPC, SOCK_DGRAM, PF_INET6);
+       if (trans->fd < 0)
+               goto error_priv;
+
+       opt = 1;
+       if (setsockopt(trans->fd, SOL_SOCKET, SO_TIMESTAMPNS_NEW,
+                      (char *)&opt, sizeof(opt)) == -1) {
+               perror("SO_TIMESTAMPNS_NEW");
+               goto error_sock;
+       }
+
+       if (endpoint->addr_len > 0) {
+               struct sockaddr_rxrpc *srx = &trans->local;
+
+               srx->srx_family         = AF_RXRPC;
+               srx->transport_type     = SOCK_DGRAM;
+               srx->transport_len      = endpoint->addr_len;
+               memcpy(&srx->transport, &endpoint->addr, endpoint->addr_len);
+
+               if (bind(trans->fd, (struct sockaddr *)srx, sizeof(*srx)) == -1)
+                       goto error_sock;
+       }
+
+       endpoint->priv = trans;
+       return true;
+
+error_sock:
+       close(trans->fd);
+error_priv:
+       free(trans);
+       return false;
+}
+
+/*
+ * Free the transport module
+ */
+static void af_rxrpc_free_local_endpoint(struct rxrpc_local_endpoint *endpoint)
+{
+       struct rxrpc_transport_private *trans = endpoint->priv;
+       struct rxrpc_rx_buf *b;
+       int i;
+
+       close(trans->fd);
+
+       for (i = 0; i < CONN_MSGS * CONN_NR_IOV; i++)
+               free(trans->rx_iov[i].iov_base);
+
+       while (!list_empty(&trans->rx_spare_buffers)) {
+               b = list_entry(trans->rx_spare_buffers.next, struct rxrpc_rx_buf, link);
+               list_del(&b->link);
+               free(b->buf);
+               free(b);
+       }
+
+       free(trans);
+}
+
+/*
+ * Set up a security descriptor.  Note that the key name can be NULL (for
+ * instance if -noauth was passed).
+ */
+static bool af_rxrpc_new_security(struct rxrpc_local_endpoint *endpoint,
+                                 struct rxrpc_security *security,
+                                 struct rxrpc_result *_result)
+{
+       struct rxrpc_transport_private *trans = endpoint->priv;
+       struct rxrpc_security_private *sec;
+
+       sec = calloc(1, sizeof(*sec));
+       if (!sec)
+               return rxrpc_nomem(_result);
+
+       switch (security->level) {
+       case rxrpc_security_no_auth:
+               break;
+       case rxrpc_security_local_auth:
+               fprintf(stderr, "Local auth not supported by AF_RXRPC transport\n");
+               goto inval;
+       case rxrpc_security_unset:
+       case rxrpc_security_clear:
+               if (security->key_name_len == 0)
+                       goto inval;
+               sec->min_sec_level = RXRPC_SECURITY_PLAIN;
+               break;
+       case rxrpc_security_integrity_only:
+               if (security->key_name_len == 0)
+                       goto inval;
+               sec->min_sec_level = RXRPC_SECURITY_AUTH;
+               break;
+       case rxrpc_security_encrypt:
+               if (security->key_name_len == 0)
+                       goto inval;
+               sec->min_sec_level = RXRPC_SECURITY_ENCRYPT;
+               break;
+       }
+
+       if (security->level != rxrpc_security_no_auth) {
+               debug("OPT RXRPC_SECURITY_KEY %s\n", security->key_name);
+               if (setsockopt(trans->fd, SOL_RXRPC, RXRPC_SECURITY_KEY,
+                              security->key_name, security->key_name_len) < 0) {
+                       free(sec);
+                       return rxrpc_syserror(_result, "setsockopt(RXRPC_SECURITY_KEY)");
+               }
+
+               debug("OPT RXRPC_MIN_SECURITY_LEVEL %u\n", sec->min_sec_level);
+               if (setsockopt(trans->fd, SOL_RXRPC, RXRPC_MIN_SECURITY_LEVEL,
+                              &sec->min_sec_level, sizeof(sec->min_sec_level)) < 0) {
+                       free(sec);
+                       return rxrpc_syserror(_result, "setsockopt(RXRPC_MIN_SECURITY_LEVEL)");
+               }
+       }
+
+       trans->sec_level = sec->min_sec_level;
+       security->priv = sec;
+       return true;
+
+inval:
+       free(sec);
+       _result->source = rxrpc_error_from_parameters;
+       _result->error = EINVAL;
+       return false;
+}
+
+/*
+ * Free a security descriptor
+ */
+static void af_rxrpc_free_security(struct rxrpc_security *security)
+{
+       free(security->priv);
+}
+
+/*
+ * Allocate a call handle for a new call.
+ */
+static bool af_rxrpc_make_call(struct rxrpc_call *call, struct rxrpc_result *_result)
+{
+       struct rxrpc_call_private *ch;
+
+       ch = calloc(1, sizeof(*call));
+       if (!ch)
+               return rxrpc_nomem(_result);
+
+       call->priv = ch;
+       return true;
+}
+
+/*
+ * Dump data from an output buffer.
+ */
+static void dump_enc_buffers(struct rxrpc_buffer *buf)
+{
+       unsigned int pos = 0;
+       bool nl = false;
+       int ioc, i;
+
+       for (ioc = 0; ioc < buf->ioc; ioc++) {
+               for (i = 0; i < buf->msg_iov[ioc].iov_len; i++) {
+                       if (!(pos & 3)) {
+                               if (!(pos & 31))
+                                       printf("%06x: ", pos);
+                               else
+                                       printf(" ");
+                       }
+
+                       printf("%02x", ((unsigned char *)buf->msg_iov[ioc].iov_base)[i]);
+                       pos++;
+                       nl = false;
+                       if ((pos & 31) == 0) {
+                               printf("\n");
+                               nl = true;
+                       }
+               }
+       }
+
+       if (!nl)
+               printf("\n");
+}
+
+/*
+ * Send buffered data.
+ */
+static bool af_rxrpc_send_data(struct rxrpc_call *call,
+                              struct rxrpc_buffer *buf,
+                              bool more,
+                              struct rxrpc_result *_result)
+{
+       struct msghdr msg;
+       unsigned char control[128];
+       unsigned int flags = 0, tmp;
+       socklen_t olen;
+       size_t ctrllen;
+       char abuf[512];
+       int ret, i;
+
+       if (rxrpc_debug_transport)
+               printf("\n");
+
+       /* Request an operation */
+       ctrllen = 0;
+       RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)call);
+       if (call->exclusive)
+               RXRPC_ADD_MISC_OPTION(control, ctrllen, RXRPC_EXCLUSIVE_CALL);
+       if (call->upgrade_service)
+               RXRPC_ADD_MISC_OPTION(control, ctrllen, RXRPC_UPGRADE_SERVICE);
+       if (call->total_tx_size) {
+               debug("total Tx size %llu\n", call->total_tx_size);
+               RXRPC_ADD_TX_LENGTH(control, ctrllen, call->total_tx_size);
+       }
+
+       msg.msg_name            = &call->peer;
+       msg.msg_namelen         = call->peer_len;
+       msg.msg_iov             = buf ? buf->msg_iov : NULL;
+       msg.msg_iovlen          = buf ? buf->ioc : 0;
+       msg.msg_control         = control;
+       msg.msg_controllen      = ctrllen;
+       msg.msg_flags           = 0;
+
+       if (more)
+               flags |= MSG_MORE;
+
+       switch (call->peer.transport.family) {
+       case AF_INET:
+               inet_ntop(call->peer.transport.family, &call->peer.transport.sin.sin_addr,
+                         abuf, sizeof(abuf));
+               debug("Tx %u Addr %s:%u\n",
+                     call->peer.srx_service, abuf, ntohs(call->peer.transport.sin.sin_port));
+               break;
+       case AF_INET6:
+               inet_ntop(call->peer.transport.family, &call->peer.transport.sin6.sin6_addr,
+                         abuf, sizeof(abuf));
+               debug("Tx %u Addr [%s]:%u\n",
+                     call->peer.srx_service, abuf, ntohs(call->peer.transport.sin6.sin6_port));
+               break;
+       }
+
+       if (rxrpc_debug_transport) {
+               dump_cmsg("Tx", &msg);
+               for (i = 0; i < msg.msg_iovlen; i++)
+                       debug("Tx IOV[%02u] %04zu %p\n",
+                             i, msg.msg_iov[i].iov_len, msg.msg_iov[i].iov_base);
+       }
+       if (rxrpc_debug_data)
+               dump_enc_buffers(buf);
+
+       olen = sizeof(tmp);
+       getsockopt(call->endpoint->priv->fd, SOL_SOCKET, SO_ERROR, (char *)&tmp, &olen);
+
+       /* Send the data */
+       ret = sendmsg(call->endpoint->priv->fd, &msg, flags);
+       if (ret < 0) {
+               debug("SENDMSG: %m\n");
+               return rxrpc_syserror(_result, "sendmsg");
+       }
+
+       debug("SENDMSG: %d%s\n", ret, more ? " [more]" : "");
+       call->priv->known_to_kernel = true;
+       return ret >= 0;
+}
+
+/*
+ * Process the metadata given to us by recvmsg().
+ */
+static bool af_rxrpc_process_anciliary(struct msghdr *msg,
+                                      struct rxrpc_call **_call,
+                                      struct rxrpc_result *result)
+{
+       struct rxrpc_call *call = NULL;
+       struct cmsghdr *cmsg;
+       unsigned long tmp;
+       enum rxrpc_recv_condition cond;
+
+       if (rxrpc_debug_transport)
+               dump_cmsg("Rx", msg);
+
+       if (msg->msg_flags & MSG_MORE)
+               cond = rxrpc_recv_more;
+       else if (msg->msg_flags & MSG_EOR)
+               cond = rxrpc_recv_complete;
+       else
+               cond = rxrpc_recv_complete_now_send;
+
+       for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
+               unsigned char *p = CMSG_DATA(cmsg);
+               int n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg));
+
+               if (cmsg->cmsg_level == SOL_SOCKET) {
+                       switch (cmsg->cmsg_type) {
+                       case SO_TIMESTAMPNS_NEW:
+                               if (call->have_rx_timestamp)
+                                       continue;
+
+                               if (n != sizeof(call->rx_timestamp)) {
+                                       fprintf(stderr, "SO_TIMESTAMPNS wrong size\n");
+                                       abort();
+                               }
+
+                               memcpy(&call->rx_timestamp, p, n);
+                               call->have_rx_timestamp = true;
+                               debug("Rx tstamp\n");
+                               break;
+                       }
+               }
+
+               if (cmsg->cmsg_level != SOL_RXRPC)
+                       continue;
+
+               switch (cmsg->cmsg_type) {
+               case RXRPC_USER_CALL_ID:
+                       if (n != sizeof(tmp)) {
+                               fprintf(stderr, "User call ID wrong size\n");
+                               abort();
+                       }
+
+                       memcpy(&tmp, p, sizeof(tmp));
+                       call = (struct rxrpc_call *)tmp;
+                       *_call = call;
+                       break;
+
+               case RXRPC_ABORT:
+                       if (n == 4)
+                               memcpy(&call->abort_code, p, 4);
+                       call->error = ECONNABORTED;
+                       result->source = rxrpc_error_remote_abort;
+                       result->error = ECONNABORTED;
+                       result->abort_code = call->abort_code;
+                       cond = rxrpc_recv_abort;
+                       break;
+
+               case RXRPC_NET_ERROR:
+                       if (n != sizeof(call->error))
+                               goto parse_error;
+                       memcpy(&call->error, p, 4);
+                       result->source = rxrpc_error_from_network;
+                       result->error = call->error;
+                       result->abort_code = 0;
+                       cond = rxrpc_recv_net_error;
+                       break;
+
+               case RXRPC_LOCAL_ERROR:
+                       if (n != sizeof(call->error))
+                               goto parse_error;
+                       memcpy(&call->error, p, 4);
+                       result->source = rxrpc_error_from_system;
+                       result->error = call->error;
+                       result->abort_code = 0;
+                       cond = rxrpc_recv_local_error;
+                       break;
+
+               case RXRPC_BUSY:
+                       result->source = rxrpc_error_peer_busy;
+                       result->error = ECONNREFUSED;
+                       result->abort_code = 0;
+                       cond = rxrpc_recv_busy;
+                       break;
+
+               case RXRPC_ACK:
+                       cond = rxrpc_recv_final_ack;
+                       break;
+
+               default:
+                       fprintf(stderr, "Unexpected CMSG type %x\n", cmsg->cmsg_type);
+                       break;
+               }
+       }
+
+       if (msg->msg_flags & MSG_EOR) {
+               debug("Rx EOR\n");
+               call->priv->known_to_kernel = false;
+       }
+
+       /* Adjust the call's state. */
+       switch (cond) {
+       case rxrpc_recv_more:
+               break;
+       case rxrpc_recv_complete_now_send:
+               break;
+       case rxrpc_recv_final_ack:
+               if (call->state != rx_call_sv_waiting_for_final_ack) {
+                       fprintf(stderr, "RxRPC: Recv-Ack in bad call state (%d)\n",
+                               call->state);
+                       abort();
+               }
+               call->state = rx_call_completed;
+               call->completed = rx_call_success;
+               break;
+       case rxrpc_recv_complete:
+               call->state = rx_call_completed;
+               call->completed = rx_call_success;
+               break;
+       case rxrpc_recv_abort:
+               call->state = rx_call_completed;
+               call->completed = rx_call_remotely_aborted;
+               break;
+       case rxrpc_recv_net_error:
+               call->state = rx_call_completed;
+               call->completed = rx_call_net_error;
+               break;
+       case rxrpc_recv_local_error:
+               call->state = rx_call_completed;
+               call->completed = rx_call_local_error;
+               break;
+       case rxrpc_recv_busy:
+               call->error = ECONNREFUSED;
+               call->state = rx_call_completed;
+               call->completed = rx_call_rejected_busy;
+               break;
+       default:
+               abort();
+       }
+
+       if (call->state == rx_call_completed && call->completion_func) {
+               call->completion_func(call);
+               call->completion_func = NULL;
+       }
+
+       debug("<--af_rxrpc_recv_data() = t [c=%u]\n", cond);
+       return true;
+
+parse_error:
+       fprintf(stderr, "cmsg parse error %u\n", cmsg->cmsg_type);
+       abort();
+}
+
+/*
+ * Process a message.
+ */
+static void af_rxrpc_recv_msg(struct rxrpc_local_endpoint *endpoint,
+                             struct msghdr *msg, unsigned int len,
+                             struct rxrpc_result *result)
+{
+       struct rxrpc_transport_private *trans = endpoint->priv;
+       struct rxrpc_call *call = NULL;
+       struct iovec *iov = msg->msg_iov;
+       struct sockaddr_rxrpc *srx = msg->msg_name;
+       unsigned int tmp;
+       int i;
+
+       if (!af_rxrpc_process_anciliary(msg, &call, result))
+               return;
+
+       if (call->service_id != srx->srx_service)
+               debug("Upgraded service to %u\n", srx->srx_service);
+       call->service_id = srx->srx_service; /* Changes if service upgrade */
+       result->service_id = srx->srx_service;
+
+       if (len > CONN_NR_IOV * RXGEN_BUFFER_SIZE) {
+               fprintf(stderr, "Too much data returned\n");
+               abort();
+       }
+
+       if (rxrpc_debug_data) {
+               tmp = len;
+               for (i = 0; i < CONN_NR_IOV && tmp > 0; i++) {
+                       unsigned int seg = (tmp <= iov[i].iov_len) ? tmp : iov[i].iov_len;
+                       bool nl = false;
+                       printf("Rx IOV[%u] %zu %u %p\n", i, iov[i].iov_len, seg, iov[i].iov_base);
+
+                       unsigned int j, s = seg; // <= 32 ? seg : 32;
+                       for (j = 0; j < s; j++) {
+                               if (!(j & 3)) {
+                                       if (!(j & 31))
+                                               printf("%06x: ", j);
+                                       else
+                                               printf(" ");
+                               }
+                               printf("%02x", ((unsigned char *)iov[i].iov_base)[j]);
+                               nl = false;
+                               if ((j & 31) == 31) {
+                                       printf("\n");
+                                       nl = true;
+                               }
+                       }
+                       if (!nl)
+                               printf("\n");
+                       tmp -= seg;
+               }
+       }
+
+       /* Update the buffers with the amount of data stored. */
+       for (i = 0; i < CONN_NR_IOV && len > 0; i++) {
+               struct rxrpc_rx_buf *b;
+               unsigned int n;
+
+               b = list_entry(trans->rx_spare_buffers.next, struct rxrpc_rx_buf, link);
+               b->index = call->dec_index++;
+
+               n = RXGEN_BUFFER_SIZE;
+               if (n > len)
+                       n = len;
+               debug("BUF[%u] +%-4u %zu {%u}\n",
+                     b->index, n, call->dec_amount + n, trans->rx_nr_buffers);
+               b->len = n;
+               b->buf = iov[i].iov_base;
+               iov[i].iov_base = NULL;
+
+               trans->rx_nr_buffers--;
+               list_move_tail(&b->link, &call->dec_buffers);
+               call->dec_amount += n;
+               len -= n;
+       }
+
+       call->dec_func(call);
+}
+
+/*
+ * Receive data from a variety of calls in one go.
+ */
+static bool af_rxrpc_recv(struct rxrpc_local_endpoint *endpoint, bool nowait,
+                         struct rxrpc_result *result)
+{
+       struct rxrpc_transport_private *trans = endpoint->priv;
+       struct iovec *iov = trans->rx_iov;
+       int count, i, j;
+
+       if (rxrpc_debug_transport)
+               printf("\n");
+
+       /* Tamp down the buffers to make sure we cycle through the memory */
+       j = 0;
+       for (i = 0; i < CONN_MSGS * CONN_NR_IOV; i++) {
+               if (!iov[i].iov_base)
+                       continue;
+               iov[j].iov_base = iov[i].iov_base;
+       }
+
+       for (; j < CONN_MSGS * CONN_NR_IOV; j++) {
+               iov[j].iov_base = malloc(RXGEN_BUFFER_SIZE);
+               if (!iov[j].iov_base)
+                       return rxrpc_nomem(result);
+       }
+
+       /* Make sure we have sufficient buffer descriptors */
+       while (trans->rx_nr_buffers < CONN_MSGS * CONN_NR_IOV) {
+               struct rxrpc_rx_buf *b = calloc(1, sizeof(struct rxrpc_rx_buf));
+               if (!b)
+                       return rxrpc_nomem(result);
+               list_add(&b->link, &trans->rx_spare_buffers);
+               trans->rx_nr_buffers++;
+       }
+
+       /* Receive messages.  The descriptor array is already set up and gets
+        * reset as messages are processed.
+        */
+       count = recvmmsg(trans->fd, trans->rx_msgs, CONN_MSGS,
+                        nowait ? MSG_DONTWAIT : MSG_WAITFORONE,
+                        NULL);
+       debug("RECVMMSG: %d\n", count);
+       if (count == -1)
+               return false;
+
+       for (i = 0; i < count; i++) {
+               struct mmsghdr *m = &trans->rx_msgs[i];
+               struct msghdr *msg = &m->msg_hdr;
+
+               debug("Rx DATA: %d [fl:%x]\n", m->msg_len, msg->msg_flags);
+
+               af_rxrpc_recv_msg(endpoint, msg, m->msg_len, result);
+
+               /* Reset the message descriptor. */
+               m->msg_hdr.msg_namelen  = sizeof(trans->rx_peer[i]);
+               m->msg_hdr.msg_controllen = CONN_CMSG_SIZE;
+               m->msg_hdr.msg_flags    = 0;
+       }
+
+       return true;
+}
+
+/*
+ * Abort a call.
+ */
+static void af_rxrpc_abort_call(struct rxrpc_call *call, uint32_t abort_code)
+{
+       struct msghdr msg;
+       size_t ctrllen;
+       unsigned char control[128];
+
+       if (call->priv->known_to_kernel) {
+               memset(control, 0, sizeof(control));
+               ctrllen = 0;
+               RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)call);
+               RXRPC_ADD_ABORT(control, ctrllen, abort_code);
+
+               msg.msg_name            = &call->peer;
+               msg.msg_namelen         = call->peer_len;
+               msg.msg_iov             = NULL;
+               msg.msg_iovlen          = 0;
+               msg.msg_control         = control;
+               msg.msg_controllen      = ctrllen;
+               msg.msg_flags           = 0;
+
+               sendmsg(call->endpoint->priv->fd, &msg, 0);
+               call->state = rx_call_completed;
+               call->completed = rx_call_locally_aborted;
+       }
+}
+
+/*
+ * Terminate a call.
+ */
+static void af_rxrpc_end_call(struct rxrpc_call *call)
+{
+       if (call->priv->known_to_kernel) {
+               fprintf(stderr, "Call still known to kernel\n");
+               abort();
+       }
+       free(call->priv);
+}
+
+struct rxrpc_transport af_rxrpc_transport = {
+       .id                     = rxrpc_transport_id,
+       .name                   = "AF_RXRPC",
+       .create_local_endpoint  = af_rxrpc_create_local_endpoint,
+       .free_local_endpoint    = af_rxrpc_free_local_endpoint,
+       .new_security           = af_rxrpc_new_security,
+       .free_security          = af_rxrpc_free_security,
+       .make_call              = af_rxrpc_make_call,
+#if 0
+       .new_service            = NULL,
+       .shutdown_service       = NULL,
+       .accept_call            = NULL,
+#endif
+       .abort_call             = af_rxrpc_abort_call,
+       .end_call               = af_rxrpc_end_call,
+       .send_data              = af_rxrpc_send_data,
+       .recv                   = af_rxrpc_recv,
+};
diff --git a/lib/list.h b/lib/list.h
new file mode 100644 (file)
index 0000000..e4882ce
--- /dev/null
@@ -0,0 +1,286 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+/*
+ * Simple doubly linked list implementation.
+ *
+ * Some of the internal functions ("__xxx") are useful when
+ * manipulating whole lists rather than single entries, as
+ * sometimes we already know the next/prev entries and we can
+ * generate better code by using them directly rather than
+ * using the generic single-entry routines.
+ */
+
+#ifndef LIST_H
+#define LIST_H
+
+struct list_head {
+       struct list_head *next, *prev;
+};
+
+static inline void init_list_head(struct list_head *list)
+{
+       list->next = list;
+       list->prev = list;
+}
+
+/**
+ * list_empty - tests whether a list is empty
+ * @head: the list to test.
+ */
+static inline int list_empty(const struct list_head *head)
+{
+       return head->next == head;
+}
+
+/*
+ * Insert a new entry between two known consecutive entries.
+ *
+ * This is only for internal list manipulation where we know
+ * the prev/next entries already!
+ */
+static inline void __list_add(struct list_head *new,
+                             struct list_head *prev,
+                             struct list_head *next)
+{
+       next->prev = new;
+       new->next = next;
+       new->prev = prev;
+       prev->next = new;
+}
+
+/**
+ * list_add - add a new entry
+ * @new: new entry to be added
+ * @head: list head to add it after
+ *
+ * Insert a new entry after the specified head.
+ * This is good for implementing stacks.
+ */
+static inline void list_add(struct list_head *new, struct list_head *head)
+{
+       __list_add(new, head, head->next);
+}
+
+/**
+ * list_add_tail - add a new entry
+ * @new: new entry to be added
+ * @head: list head to add it before
+ *
+ * Insert a new entry before the specified head.
+ * This is useful for implementing queues.
+ */
+static inline void list_add_tail(struct list_head *new, struct list_head *head)
+{
+       __list_add(new, head->prev, head);
+}
+
+/*
+ * Delete a list entry by making the prev/next entries
+ * point to each other.
+ *
+ * This is only for internal list manipulation where we know
+ * the prev/next entries already!
+ */
+static inline void __list_del(struct list_head * prev, struct list_head * next)
+{
+       next->prev = prev;
+       prev->next = next;
+}
+
+/**
+ * list_del - deletes entry from list.
+ * @entry: the element to delete from the list.
+ * Note: list_empty() on entry does not return true after this, the entry is
+ * in an undefined state.
+ */
+static inline void __list_del_entry(struct list_head *entry)
+{
+       __list_del(entry->prev, entry->next);
+}
+
+static inline void list_del(struct list_head *entry)
+{
+       __list_del_entry(entry);
+       entry->next = (void *)0xa1;
+       entry->prev = (void *)0xa3;
+}
+
+/**
+ * list_del_init - deletes entry from list and reinitialize it.
+ * @entry: the element to delete from the list.
+ */
+static inline void list_del_init(struct list_head *entry)
+{
+       __list_del_entry(entry);
+       init_list_head(entry);
+}
+
+/**
+ * list_replace - replace old entry by new one
+ * @old : the element to be replaced
+ * @new : the new element to insert
+ *
+ * If @old was empty, it will be overwritten.
+ */
+static inline void list_replace(struct list_head *old,
+                               struct list_head *new)
+{
+       new->next = old->next;
+       new->next->prev = new;
+       new->prev = old->prev;
+       new->prev->next = new;
+}
+
+static inline void list_replace_init(struct list_head *old,
+                                       struct list_head *new)
+{
+       list_replace(old, new);
+       init_list_head(old);
+}
+
+/**
+ * list_move - delete from one list and add as another's head
+ * @list: the entry to move
+ * @head: the head that will precede our entry
+ */
+static inline void list_move(struct list_head *list, struct list_head *head)
+{
+       __list_del_entry(list);
+       list_add(list, head);
+}
+
+/**
+ * list_move_tail - delete from one list and add as another's tail
+ * @list: the entry to move
+ * @head: the head that will follow our entry
+ */
+static inline void list_move_tail(struct list_head *list,
+                                 struct list_head *head)
+{
+       __list_del_entry(list);
+       list_add_tail(list, head);
+}
+
+static inline void __list_splice(const struct list_head *list,
+                                struct list_head *prev,
+                                struct list_head *next)
+{
+       struct list_head *first = list->next;
+       struct list_head *last = list->prev;
+
+       first->prev = prev;
+       prev->next = first;
+
+       last->next = next;
+       next->prev = last;
+}
+
+/**
+ * list_splice - join two lists, this is designed for stacks
+ * @list: the new list to add.
+ * @head: the place to add it in the first list.
+ */
+static inline void list_splice(const struct list_head *list,
+                               struct list_head *head)
+{
+       if (!list_empty(list))
+               __list_splice(list, head, head->next);
+}
+
+/**
+ * list_splice_tail - join two lists, each list being a queue
+ * @list: the new list to add.
+ * @head: the place to add it in the first list.
+ */
+static inline void list_splice_tail(struct list_head *list,
+                               struct list_head *head)
+{
+       if (!list_empty(list))
+               __list_splice(list, head->prev, head);
+}
+
+/**
+ * list_splice_init - join two lists and reinitialise the emptied list.
+ * @list: the new list to add.
+ * @head: the place to add it in the first list.
+ *
+ * The list at @list is reinitialised
+ */
+static inline void list_splice_init(struct list_head *list,
+                                   struct list_head *head)
+{
+       if (!list_empty(list)) {
+               __list_splice(list, head, head->next);
+               init_list_head(list);
+       }
+}
+
+/**
+ * list_splice_tail_init - join two lists and reinitialise the emptied list
+ * @list: the new list to add.
+ * @head: the place to add it in the first list.
+ *
+ * Each of the lists is a queue.
+ * The list at @list is reinitialised
+ */
+static inline void list_splice_tail_init(struct list_head *list,
+                                        struct list_head *head)
+{
+       if (!list_empty(list)) {
+               __list_splice(list, head->prev, head);
+               init_list_head(list);
+       }
+}
+
+#define offsetof(TYPE, MEMBER) ((size_t)&((TYPE *)0)->MEMBER)
+
+/**
+ * container_of - cast a member of a structure out to the containing structure
+ * @ptr:       the pointer to the member.
+ * @type:      the type of the container struct this is embedded in.
+ * @member:    the name of the member within the struct.
+ *
+ */
+#define container_of(ptr, type, member) ({                             \
+       void *__mptr = (void *)(ptr);                                   \
+       ((type *)(__mptr - offsetof(type, member))); })
+
+/**
+ * list_entry - get the struct for this entry
+ * @ptr:       the &struct list_head pointer.
+ * @type:      the type of the struct this is embedded in.
+ * @member:    the name of the list_head within the struct.
+ */
+#define list_entry(ptr, type, member) \
+       container_of(ptr, type, member)
+
+/**
+ * list_first_entry - get the first element from a list
+ * @ptr:       the list head to take the element from.
+ * @type:      the type of the struct this is embedded in.
+ * @member:    the name of the list_head within the struct.
+ *
+ * Note, that list is expected to be not empty.
+ */
+#define list_first_entry(ptr, type, member) \
+       list_entry((ptr)->next, type, member)
+
+/**
+ * list_next_entry - get the next element in list
+ * @pos:       the type * to cursor
+ * @member:    the name of the list_head within the struct.
+ */
+#define list_next_entry(pos, member) \
+       list_entry((pos)->member.next, typeof(*(pos)), member)
+
+/**
+ * list_for_each_entry -       iterate over list of given type
+ * @pos:       the type * to use as a loop cursor.
+ * @head:      the head for your list.
+ * @member:    the name of the list_head within the struct.
+ */
+#define list_for_each_entry(pos, head, member)                         \
+       for (pos = list_first_entry(head, typeof(*pos), member);        \
+            &pos->member != (head);                                    \
+            pos = list_next_entry(pos, member))
+
+#endif /* LIST_H */
index ad7ced8aada7055ebd141d5dda4a0a9b4847bce1..67b6d6ce1454a3291e9aa9b66b7d7b823171c387 100644 (file)
 #include <stdint.h>
 #include <stdbool.h>
 #include <string.h>
+#include <errno.h>
 #include <sys/socket.h>
 #include <sys/uio.h>
 #include <arpa/inet.h>
 #include <uuid/uuid.h>
 #include <linux/rxrpc.h>
+#include "list.h"
+
+extern bool rxrpc_debug_buffers;
+extern bool rxrpc_debug_core;
+extern bool rxrpc_debug_data;
+extern bool rxrpc_debug_ops;
+extern bool rxrpc_debug_transport;
+extern bool rxrpc_debug_xdr;
 
 typedef unsigned int net_xdr_t;
 
 struct xdr_u64 { net_xdr_t hi, lo; };
 
+enum rxrpc_security_auth_level {
+       rxrpc_security_unset,
+       rxrpc_security_no_auth,
+       rxrpc_security_local_auth,
+       rxrpc_security_clear,
+       rxrpc_security_integrity_only,
+       rxrpc_security_encrypt,
+};
+
+enum rxrpc_error_source {
+       rxrpc_error_none,
+       rxrpc_error_remote_abort,
+       rxrpc_error_peer_busy,
+       rxrpc_error_from_system,
+       rxrpc_error_from_network,
+       rxrpc_error_from_parameters,
+};
+
 struct rxrpc_result {
+       enum rxrpc_error_source source;
+       int                     error;
+       int                     abort_code;
+       unsigned short          service_id;     /* Service actually used (may be upgraded) */
+       const char              *op_name;
+};
+
+/*
+ * Local transport endpoint.
+ */
+struct rxrpc_local_endpoint {
+       struct rxrpc_transport_private  *priv;
+       struct sockaddr_rxrpc           addr;
+       socklen_t                       addr_len;
+};
+
+/*
+ * Security details to use.
+ */
+struct rxrpc_security {
+       struct rxrpc_security_private   *priv;
+       enum rxrpc_security_auth_level  level;
+       unsigned int                    key_name_len;
+       char                            key_name[];     /* Name of key to use */
 };
 
 /*
  * Call parameters.
  */
 struct rxrpc_call_params {
+       struct rxrpc_local_endpoint     *endpoint;
+       struct rxrpc_security           *security;
+       struct sockaddr_rxrpc           peer;
+       socklen_t                       peer_len;
        bool                            exclusive;
        bool                            upgrade_service;
 };
 
 enum rx_call_state {
+       rx_call_cl_not_started,
+       rx_call_cl_encoding_params,
+       rx_call_cl_receiving_response,
+
+       rx_call_sv_not_started,
+       rx_call_sv_receiving_params,
+       rx_call_sv_wait_for_no_MSG_MORE,
+       rx_call_sv_processing,
+       rx_call_sv_encoding_response_split,
+       rx_call_sv_encoding_response,
+       rx_call_sv_response_encoded,
+       rx_call_sv_waiting_for_final_ack,
        rx_call_completed,
 };
 
+enum rx_call_completion {
+       rx_call_incomplete,
+       rx_call_success,
+       rx_call_remotely_aborted,
+       rx_call_locally_aborted,
+       rx_call_net_error,
+       rx_call_local_error,
+       rx_call_rejected_busy,
+};
+
 struct rxrpc_resv {
        unsigned int            max_ioc;        /* Size of msg_iov[] */
        size_t                  alloc;          /* Size of buffer allocated */
@@ -62,21 +139,59 @@ struct rxrpc_buffer {
  * RxRPC call.
  */
 struct rxrpc_call {
+       struct rxrpc_call_private *priv;
        struct rxrpc_local_endpoint *endpoint;
+       struct sockaddr_rxrpc   peer;
+       socklen_t               peer_len;
+       unsigned short          service_id;
+       const char              *op_name;       /* Name of the RPC operation */
        enum rx_call_state      state;
+       enum rx_call_completion completed;
+       uint32_t                abort_code;
+       unsigned int            error;
 
+       bool                    client_side;    /* True if client side of call */
        bool                    split;          /* True if split-mode call */
+       bool                    exclusive;
+       bool                    upgrade_service;
+       bool                    have_rx_timestamp;
+
+       struct timespec         rx_timestamp;   /* Timestamp of first received packet */
+
+       void (*completion_func)(struct rxrpc_call *);
 
        /* Transmission phase state */
+       unsigned long long      total_tx_size;  /* Total expected transmission size */
        uint32_t                encode_error;
 
        /* Receive phase state */
+       bool                    rx_ended;       /* True if all data read */
        bool                    dec_oom;        /* True if we got ENOMEM during decode */
-       uint32_t                decode_error;
+       uint32_t                decode_error;   /* Abort code if there's a decode error. */
+       unsigned int            dec_index;
+       struct list_head        dec_buffers;    /* List of received buffers */
+       size_t                  dec_amount;     /* Amount of data in dec_buffers */
+       void (*dec_func)(struct rxrpc_call *);
 };
 
-extern bool rxrpc_receive(struct rxrpc_local_endpoint *endpoint, bool nowait,
-                         struct rxrpc_result *result);
+#define RXGEN_BUFFER_SIZE      1024
+
+struct rxrpc_rx_buf {
+       struct list_head        link;           /* Link in buffer queue */
+       uint8_t                 *buf;
+       unsigned short          len;            /* Amount of data in buffer   */
+       unsigned short          used;           /* How much data used */
+       unsigned int            index;          /* Index in buffer list */
+};
+
+extern struct rxrpc_local_endpoint *rxrpc_new_local_endpoint(const struct sockaddr_rxrpc *local_sa,
+                                                            socklen_t local_salen);
+extern void rxrpc_free_local_endpoint(struct rxrpc_local_endpoint *endpoint);
+extern struct rxrpc_security *rxrpc_new_security(struct rxrpc_local_endpoint *endpoint,
+                                                const char *key_name,
+                                                enum rxrpc_security_auth_level level,
+                                                struct rxrpc_result *_result);
+extern void rxrpc_free_security(struct rxrpc_security *security);
 
 extern struct rxrpc_buffer *rxrpc_alloc_enc_buffer(struct rxrpc_resv *resv);
 extern void rxrpc_free_buffer(struct rxrpc_buffer *buf);
@@ -84,6 +199,8 @@ extern struct rxrpc_call *rxrpc_make_call(struct rxrpc_call_params *z_params,
                                          struct rxrpc_buffer *z_request,
                                          const char *op_name,
                                          struct rxrpc_result *_result);
+extern bool rxrpc_receive(struct rxrpc_local_endpoint *endpoint, bool nowait,
+                         struct rxrpc_result *result);
 extern void rxrpc_receive_reply(struct rxrpc_call *call);
 extern bool rxrpc_request_received(struct rxrpc_call *call);
 extern bool rxrpc_read(struct rxrpc_call *z_call, void *buffer, size_t buffer_size);
@@ -91,7 +208,7 @@ extern bool rxrpc_send_data(struct rxrpc_call *call, struct rxrpc_buffer *buf, b
                            struct rxrpc_result *_result);
 extern void rxrpc_abort_call(struct rxrpc_call *call, uint32_t abort_code);
 extern bool rxrpc_end_call(struct rxrpc_call *call, uint32_t abort_code);
-extern bool rxrpc_call_failed(struct rxrpc_call *call);
+extern bool rxrpc_call_failed(struct rxrpc_result *result);
 
 /*
  * XDR encoding size calculation routines
@@ -212,4 +329,24 @@ static inline uint64_t rxrpc_dec_u64(struct rxrpc_call *call)
        return x;
 }
 
+/*
+ * Error handling routines
+ */
+static inline __attribute__((format(printf,2,3)))
+bool rxrpc_syserror(struct rxrpc_result *result, const char *fmt, ...)
+{
+       result->source = rxrpc_error_from_system;
+       result->error = errno;
+       result->abort_code = 0;
+       return false;
+}
+
+static inline bool rxrpc_nomem(struct rxrpc_result *result)
+{
+       result->source = rxrpc_error_from_system;
+       result->error = ENOMEM;
+       result->abort_code = 0;
+       return false;
+}
+
 #endif /* RXRPC_H */
diff --git a/lib/rxrpc_core.c b/lib/rxrpc_core.c
new file mode 100644 (file)
index 0000000..de2818f
--- /dev/null
@@ -0,0 +1,326 @@
+/* rxrpc core
+ *
+ * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include "rxrpc.h"
+#include "transport.h"
+
+bool rxrpc_debug_buffers;      /* Debug buffer handling */
+bool rxrpc_debug_core;         /* Debug core management */
+bool rxrpc_debug_data;         /* Dump data */
+bool rxrpc_debug_ops;          /* Debug operations */
+bool rxrpc_debug_transport;    /* Debug transport */
+bool rxrpc_debug_xdr;          /* Debug XDR encoding/decoding */
+
+#define debug(fmt, ...) \
+       do { if (rxrpc_debug_core) printf("CORE: "fmt, ## __VA_ARGS__); } while (0)
+#define debug_op(fmt, ...) \
+       do { if (rxrpc_debug_ops) printf("OP: "fmt, ## __VA_ARGS__); } while (0)
+
+static struct rxrpc_transport *rxrpc_transport = &af_rxrpc_transport;
+
+/*
+ * Set up a new transport
+ */
+struct rxrpc_local_endpoint *rxrpc_new_local_endpoint(const struct sockaddr_rxrpc *local_addr,
+                                                     socklen_t local_len)
+{
+       struct rxrpc_local_endpoint *endpoint;
+
+       if (local_len > sizeof(endpoint->addr)) {
+               errno = EINVAL;
+               return NULL;
+       }
+
+       if (!rxrpc_transport) {
+               fprintf(stderr, "No Rx transport loaded\n");
+               errno = EINVAL;
+               return NULL;
+       }
+
+       endpoint = calloc(1, sizeof(*endpoint));
+       if (!endpoint)
+               return NULL;
+
+       if (local_len > 0)
+               memcpy(&endpoint->addr, local_addr, local_len);
+
+       if (!rxrpc_transport->create_local_endpoint(endpoint)) {
+               free(endpoint);
+               return NULL;
+       }
+
+       return endpoint;
+}
+
+/*
+ * Destroy a transport
+ */
+void rxrpc_free_local_endpoint(struct rxrpc_local_endpoint *endpoint)
+{
+       rxrpc_transport->free_local_endpoint(endpoint);
+       free(endpoint);
+}
+
+/*
+ * Set up a new security context.
+ */
+struct rxrpc_security *rxrpc_new_security(struct rxrpc_local_endpoint *endpoint,
+                                         const char *key_name,
+                                         enum rxrpc_security_auth_level level,
+                                         struct rxrpc_result *_result)
+{
+       struct rxrpc_security *security;
+       size_t len = 0;
+
+       if (level < rxrpc_security_unset ||
+           level > rxrpc_security_encrypt) {
+               _result->source = rxrpc_error_from_parameters;
+               _result->error = EINVAL;
+               return NULL;
+       }
+
+       if (key_name)
+               len = strlen(key_name) + 1;
+
+       security = calloc(1, sizeof(*security) + len);
+       if (!security) {
+               rxrpc_nomem(_result);
+               return NULL;
+       }
+
+       security->level = level;
+       if (key_name) {
+               security->key_name_len = len - 1;
+               memcpy(security->key_name, key_name, len);
+       }
+
+       if (!rxrpc_transport->new_security(endpoint, security, _result)) {
+               free(security);
+               return NULL;
+       }
+
+       return security;
+}
+
+/*
+ * Destroy a security context.
+ */
+void rxrpc_free_security(struct rxrpc_security *security)
+{
+       if (security) {
+               rxrpc_transport->free_security(security);
+               free(security);
+       }
+}
+
+/*
+ * Set up a client call and prepare for encoding.
+ */
+struct rxrpc_call *rxrpc_make_call(struct rxrpc_call_params *params,
+                                  struct rxrpc_buffer *buf,
+                                  const char *op_name,
+                                  struct rxrpc_result *_result)
+{
+       struct rxrpc_call *call;
+
+       _result->op_name = op_name;
+
+       if (params->peer_len < 5 * 2 ||
+           params->peer_len > sizeof(call->peer)) {
+               errno = EDESTADDRREQ;
+               rxrpc_syserror(_result, "Peer address too small");
+               return NULL;
+       }
+
+       if (params->peer.srx_family != AF_RXRPC) {
+               errno = EPROTOTYPE;
+               rxrpc_syserror(_result, "Peer address family unsupported");
+               return NULL;
+       }
+
+       call = calloc(1, sizeof(*call));
+       if (!call) {
+               rxrpc_nomem(_result);
+               return NULL;
+       }
+
+       call->endpoint = params->endpoint;
+       call->service_id = params->peer.srx_service;
+       call->state = rx_call_cl_not_started;
+       call->client_side = true;
+       call->total_tx_size = buf->size;
+       call->exclusive = params->exclusive;
+       call->upgrade_service = params->upgrade_service;
+       call->op_name = op_name;
+       call->peer_len = params->peer_len;
+       memcpy(&call->peer, &params->peer, params->peer_len);
+
+       call->dec_func = rxrpc_receive_reply;
+
+       init_list_head(&call->dec_buffers);
+
+       debug_op("Send %s\n", op_name);
+       if (!rxrpc_transport->make_call(call, _result))
+               goto error;
+
+       return call;
+
+error:
+       free(call);
+       return NULL;
+}
+
+/*
+ * Abort a call.
+ */
+void rxrpc_abort_call(struct rxrpc_call *call, uint32_t abort_code)
+{
+       rxrpc_transport->abort_call(call, abort_code);
+       call->state = rx_call_completed;
+       call->completed = rx_call_locally_aborted;
+}
+
+/*
+ * End a call, aborting it if necessary.
+ */
+bool rxrpc_end_call(struct rxrpc_call *call, uint32_t abort_code)
+{
+       enum rx_call_completion completed;
+       struct rxrpc_rx_buf *b;
+
+       if (call->state != rx_call_completed)
+               rxrpc_abort_call(call, abort_code);
+       rxrpc_transport->end_call(call);
+
+       while (!list_empty(&call->dec_buffers)) {
+               b = list_entry(call->dec_buffers.next, struct rxrpc_rx_buf, link);
+               list_del(&b->link);
+               free(b->buf);
+               free(b);
+       }
+
+       completed = call->completed;
+       free(call);
+       return completed == rx_call_success;
+}
+
+/*
+ * Send buffered data.
+ */
+bool rxrpc_send_data(struct rxrpc_call *call, struct rxrpc_buffer *buf, bool more,
+                    struct rxrpc_result *_result)
+{
+       debug("-->rxrpc_send_data(%u,%u)\n", call->state, more);
+
+       /* Switch into encode state */
+       switch (call->state) {
+       case rx_call_cl_not_started:
+       case rx_call_sv_processing:
+               call->state++;
+               break;
+       case rx_call_cl_encoding_params:
+       case rx_call_sv_encoding_response:
+               break;
+       default:
+               fprintf(stderr, "RxRPC: Send in bad call state (%d)\n", call->state);
+               abort();
+       }
+
+       if (!rxrpc_transport->send_data(call, buf, more, _result))
+               return false;
+
+       if (!more) {
+               call->state++;
+               if (call->state == rx_call_cl_encoding_params ||
+                   call->state == rx_call_sv_encoding_response)
+                       call->state++;
+       }
+
+       return true;
+}
+
+/*
+ * Reply decoder.
+ */
+void rxrpc_receive_reply(struct rxrpc_call *call)
+{
+       if (call->state == rx_call_completed &&
+           call->completed == rx_call_success)
+               debug("Reply received\n");
+}
+
+/*
+ * End of split notification.
+ */
+void rxrpc_end_split(struct rxrpc_call *call,
+                    struct rxrpc_result *_result)
+{
+       switch (call->state) {
+       case rx_call_cl_encoding_params:
+               rxrpc_transport->send_data(call, NULL, false, _result);
+               break;
+
+       case rx_call_cl_receiving_response:
+               call->dec_func = rxrpc_receive_reply;
+               call->dec_func(call);
+               break;
+
+       default:
+               fprintf(stderr, "rxrpc_end_split() called in state %u\n",
+                       call->state);
+               return;
+       }
+}
+
+/*
+ * End of reception of request.  There should be no more data in the buffers.
+ */
+bool rxrpc_request_received(struct rxrpc_call *call)
+{
+       switch (call->state) {
+       case rx_call_sv_receiving_params:
+
+       default:
+               fprintf(stderr, "rxrpc_request_received() called in state %u\n",
+                       call->state);
+               return false;
+       }
+}
+
+/*
+ * Receive data from the local endpoint.
+ */
+bool rxrpc_receive(struct rxrpc_local_endpoint *endpoint, bool nowait,
+                  struct rxrpc_result *result)
+{
+       return rxrpc_transport->recv(endpoint, nowait, result);
+}
+
+bool rxrpc_call_failed(struct rxrpc_result *result)
+{
+       char buf[128];
+
+       if (result->source == rxrpc_error_remote_abort) {
+               fprintf(stderr, "OP: %s aborted: %d\n",
+                       result->op_name, result->abort_code);
+       } else {
+               buf[0] = 0;
+               strerror_r(result->error, buf, sizeof(buf));
+               fprintf(stderr, "OP: %s failed: %s\n",
+                       result->op_name, buf);
+       }
+
+       return false;
+}
diff --git a/lib/rxrpc_xdr.c b/lib/rxrpc_xdr.c
new file mode 100644 (file)
index 0000000..ed504d1
--- /dev/null
@@ -0,0 +1,256 @@
+/* XDR coding and decoding.
+ *
+ * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include "rxrpc.h"
+
+#define debug(fmt, ...) \
+       do { if (rxrpc_debug_xdr) printf("XDR: "fmt, ## __VA_ARGS__); } while (0)
+#define debug_buffer(fmt, ...) \
+       do { if (rxrpc_debug_buffers) printf("BUF: "fmt, ## __VA_ARGS__); } while (0)
+
+#define UAENOMEM 0x2f6df0b
+
+static inline size_t rxrpc_calc_padding(size_t x)
+{
+       return (4 - (x & 3)) & 3;
+}
+
+/*
+ * Set up an encoded buffer.
+ */
+struct rxrpc_buffer *rxrpc_alloc_enc_buffer(struct rxrpc_resv *resv)
+{
+       struct rxrpc_buffer *buf;
+       size_t alloc;
+
+       buf = calloc(1, sizeof(struct rxrpc_buffer));
+       if (!buf)
+               return NULL;
+
+       if (resv->max_ioc == 0)
+               abort();
+       buf->msg_iov = calloc(resv->max_ioc, sizeof(struct iovec));
+       if (!buf->msg_iov)
+               goto error;
+
+       alloc = (resv->alloc + 3) / 4;
+       buf->enc_buf = calloc(alloc, sizeof(net_xdr_t));
+       if (!buf->enc_buf)
+               goto error;
+
+       buf->size = resv->alloc + resv->additional;
+       buf->enc_buf_p = buf->enc_buf;
+       buf->enc_buf_end = buf->enc_buf + alloc;
+       buf->msg_iov[0].iov_base = buf->enc_buf_p;
+       debug_buffer("Tx ALLOC s=%zx a=%zx ioc=%u\n", buf->size, resv->alloc, resv->max_ioc);
+       return buf;
+
+error:
+       rxrpc_free_buffer(buf);
+       return NULL;
+}
+
+void rxrpc_free_buffer(struct rxrpc_buffer *buf)
+{
+       if (buf) {
+               free(buf->msg_iov);
+               free(buf->enc_buf);
+               free(buf);
+       }
+}
+
+/*
+ * Advance the iov array by two slots, leaving a slot for a blob.
+ */
+static void rxrpc_enc_advance_iov_by_2(struct rxrpc_buffer *buf)
+{
+       unsigned int ioc = buf->ioc;
+
+       /* Note that iov_base may not be 4-bytes aligned if it starts with blob
+        * padding.
+        */
+       buf->msg_iov[ioc].iov_len =
+               (void *)buf->enc_buf_p - buf->msg_iov[ioc].iov_base;
+
+       ioc += 2;
+       buf->msg_iov[ioc].iov_base = buf->enc_buf_p;
+       buf->ioc = ioc;
+}
+
+/*
+ * Encode an opaque blob to XDR.  We copy in the blob if it's small, otherwise
+ * we assign it its own iov slot.
+ */
+void rxrpc_enc_opaque(struct rxrpc_buffer *buf,
+                     const void *blob, unsigned int size, unsigned int max)
+{
+       unsigned int ioc;
+       size_t padsize = rxrpc_calc_padding(size);
+       size_t total = size + padsize;
+
+       rxrpc_enc(buf, size);
+
+       if (rxrpc_resv_opaque_inline(size, max)) {
+               if (size & 3)
+                       buf->enc_buf_p[size >> 2] = 0; /* Clear padding */
+               memcpy(buf->enc_buf_p, blob, size);
+               buf->enc_buf_p += total / sizeof(net_xdr_t);
+       } else {
+               rxrpc_enc_advance_iov_by_2(buf);
+               ioc = buf->ioc;
+               buf->msg_iov[ioc - 1].iov_len = size;
+               buf->msg_iov[ioc - 1].iov_base = (void *)blob;
+               if (padsize > 0) {
+                       rxrpc_enc(buf, 0);
+                       buf->msg_iov[ioc].iov_base += 4 - padsize;
+                       buf->msg_iov[ioc].iov_len += padsize;
+               }
+       }
+}
+
+static bool rxrpc_dec_abort(struct rxrpc_call *call, unsigned int abort_code)
+{
+       call->decode_error = abort_code;
+       rxrpc_abort_call(call, call->decode_error);
+       return false;
+}
+
+static bool rxrpc_dec_underrun(struct rxrpc_call *call, size_t size)
+{
+       fprintf(stderr, "rxrpc XDR decoder underrun (%zx/%zx)\n",
+               call->dec_amount, size);
+       return rxrpc_dec_abort(call, call->client_side ? RXGEN_CC_UNMARSHAL : RXGEN_SS_UNMARSHAL);
+}
+
+/*
+ * Read data from the receive buffers.
+ */
+bool rxrpc_read(struct rxrpc_call *call, void *buffer, size_t buffer_size)
+{
+       struct rxrpc_rx_buf *b;
+       unsigned int seg;
+       size_t offset = 0;
+       int i;
+
+       debug("READ %zu/%zu\n", buffer_size, call->dec_amount);
+
+       if (buffer_size > call->dec_amount) {
+               if (call->state == rx_call_completed &&
+                   call->completed != rx_call_success)
+                       return false;
+               if (call->rx_ended)
+                       return rxrpc_dec_underrun(call, buffer_size);
+               printf("Partial fail\n");
+               return true;
+       }
+
+       while (offset < buffer_size && !list_empty(&call->dec_buffers)) {
+               b = list_entry(call->dec_buffers.next, struct rxrpc_rx_buf, link);
+
+               seg = b->len - b->used;
+               if (seg > buffer_size - offset)
+                       seg = buffer_size - offset;
+
+               debug("B[%u] %zu/%zu from %u/%zu\n",
+                      b->index, offset, buffer_size, seg, call->dec_amount);
+
+               memcpy(buffer + offset, b->buf + b->used, seg);
+               offset += seg;
+               b->used += seg;
+               call->dec_amount -= seg;
+
+               if (b->used == b->len) {
+                       list_del(&b->link);
+                       free(b->buf);
+                       free(b);
+               } else {
+                       if (offset != buffer_size)
+                               abort();
+               }
+       }
+
+       if (offset < buffer_size) {
+               fprintf(stderr, "Inconsistent amount of buffered rx data\n");
+               abort();
+       }
+
+       if (list_empty(&call->dec_buffers) && call->dec_amount != 0) {
+               fprintf(stderr, "call->dec_amount should be zero\n");
+               abort();
+       }
+
+       if (rxrpc_debug_xdr) {
+               seg = buffer_size <= 32 ? buffer_size : 32;
+               debug("");
+               for (i = 0; i < seg; i++)
+                       printf("%02x", ((unsigned char *)buffer)[i]);
+               printf("\n");
+       }
+
+       return true;
+}
+
+/*
+ * Decode a 32-bit integer.
+ */
+uint32_t rxrpc_dec(struct rxrpc_call *call)
+{
+       uint32_t x;
+
+       if (rxrpc_read(call, &x, sizeof(x)))
+               return ntohl(x);
+       return 0;
+}
+
+/*
+ * Decode a 32-bit integer and check limit.
+ */
+uint32_t rxrpc_dec_limit(struct rxrpc_call *call, size_t limit)
+{
+       uint32_t x;
+
+       x = rxrpc_dec(call);
+       if (x > limit && !call->decode_error) {
+               fprintf(stderr, "XDR: Decoded value exceeded limit\n");
+               call->decode_error = call->client_side ? RXGEN_CC_UNMARSHAL : RXGEN_SS_UNMARSHAL;
+               rxrpc_abort_call(call, call->decode_error);
+               return 0;
+       }
+
+       return x;
+}
+
+/*
+ * Decode a blob.  We allocate enough space to hold the padding and/or a
+ * NUL-termination.
+ */
+void *rxrpc_dec_blob(struct rxrpc_call *call, size_t len)
+{
+       unsigned char *p;
+       size_t padded_len = xdr_round_up(len);
+
+       p = malloc(padded_len + (padded_len == len ? 1 : 0));
+       if (!p) {
+               rxrpc_dec_abort(call, UAENOMEM);
+               return NULL;
+       }
+
+       if (!rxrpc_read(call, p, padded_len)) {
+               free(p);
+               return NULL;
+       }
+
+       p[len] = 0;
+       return p;
+}
diff --git a/lib/transport.h b/lib/transport.h
new file mode 100644 (file)
index 0000000..897b89c
--- /dev/null
@@ -0,0 +1,97 @@
+/* Access points into the dynamically loaded Rx transport module
+ *
+ * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#ifndef _RXRPC_TRANSPORT_H
+#define _RXRPC_TRANSPORT_H
+
+#include <stdint.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include "rxrpc.h"
+
+enum rxrpc_recv_condition {
+       rxrpc_recv_more,
+       rxrpc_recv_final_ack,
+       rxrpc_recv_complete,
+       rxrpc_recv_complete_now_send,
+       rxrpc_recv_abort,
+       rxrpc_recv_net_error,
+       rxrpc_recv_local_error,
+       rxrpc_recv_busy,
+};
+
+#define rxrpc_transport_id "rxrpc-utils tran"
+
+struct rxrpc_transport_private;
+struct rxrpc_security_private;
+struct rxrpc_service_private;
+struct rxrpc_call_private;
+
+#if 0
+typedef void (*rxrpc_incoming_call_func_t)(struct rxrpc_service *service,
+                                          void *caller_data,
+                                          const struct sockaddr_rxrpc *peer,
+                                          socklen_t *peerlen);
+#endif
+
+struct rxrpc_transport {
+       char id[24];    /* = rxrpc_transport_id */
+       char name[16];  /* Name of transport */
+
+       bool (*create_local_endpoint)(struct rxrpc_local_endpoint *endpoint);
+       
+       void (*free_local_endpoint)(struct rxrpc_local_endpoint *endpoint);
+
+       bool (*new_security)(struct rxrpc_local_endpoint *endpoint,
+                            struct rxrpc_security *security,
+                            struct rxrpc_result *_result);
+
+       void (*free_security)(struct rxrpc_security *security);
+
+       bool (*make_call)(struct rxrpc_call *call,
+                         struct rxrpc_result *_result);
+
+#if 0
+       int (*new_service)(struct rxrpc_local_endpoint *endpoint,
+                          struct rxrpc_service *service,
+                          rxrpc_incoming_call_func_t new_call_func,
+                          void *caller_data,
+                          uint16_t local_port,
+                          uint16_t local_service,
+                          struct rxrpc_security_private **permits);
+
+       void (*shutdown_service)(struct rxrpc_service_private *service);
+       
+       int (*accept_call)(struct rxrpc_call *call,
+                          struct rxrpc_service *service,
+                          void *caller_data,
+                          struct sockaddr_rxrpc *_sa,
+                          socklen_t *_salen);
+#endif
+
+       void (*abort_call)(struct rxrpc_call *call,
+                          uint32_t abort_code);
+
+       void (*end_call)(struct rxrpc_call *call);
+
+       bool (*send_data)(struct rxrpc_call *call,
+                         struct rxrpc_buffer *buf,
+                         bool more,
+                         struct rxrpc_result *_result);
+
+       bool (*recv)(struct rxrpc_local_endpoint *endpoint,
+                    bool nowait,
+                    struct rxrpc_result *result);
+};
+
+extern struct rxrpc_transport af_rxrpc_transport;
+
+#endif /* _RXRPC_TRANSPORT_H */