*~
+*.o
+.*.o.d
+*.so
+lib/libkafs_utils.*
--- /dev/null
+all:
+ $(MAKE) -C lib
+
+clean:
+ $(RM) *~
+ $(MAKE) -C lib clean
+ $(MAKE) -C rxgen clean
--- /dev/null
+CFLAGS := -g -Wall -Wformat -fpic -O2 -Wno-pointer-arith
+
+LIB_SRCS := rxrpc_xdr.C af_rxrpc.C rxrpc_core.C
+LIB_OBJS := $(patsubst %.C,%.o,$(LIB_SRCS))
+
+%.o: %.C
+ $(CXX) $(CPPFLAGS) -MMD -MF .$@.d $(CFLAGS) -o $@ -c $<
+
+all: libkafs_utils.a
+
+libkafs_utils.a: $(LIB_OBJS)
+ $(AR) rcs -o $@ $(LIB_OBJS)
+
+DEPS := $(wildcard .*.o.d)
+ifneq ($(DEPS),)
+include $(DEPS)
+endif
+
+clean:
+ $(RM) *~ *.o *.so *.a $(DEPS)
--- /dev/null
+/* AF_RXRPC driver
+ *
+ * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#define _XOPEN_SOURCE
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <poll.h>
+#include <errno.h>
+#include <limits.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <linux/rxrpc.h>
+#include "transport.H"
+
+using rxrpc::ref;
+class af_rxrpc_call;
+
+#define debug(fmt, ...) \
+ do { if (rxrpc::debug_transport) printf("AFRX: " fmt, ## __VA_ARGS__); } while (0)
+#define debug_buffer(fmt, ...) \
+ do { if (rxrpc::debug_buffers) printf("BUF: " fmt, ## __VA_ARGS__); } while (0)
+
+#define RXRPC_SELECT_CALL_FOR_RECV 7 /* Specify the call for recvmsg, SIOCINQ, etc. */
+#define RXRPC_SELECT_CALL_FOR_SEND 8 /* Specify the call for splice, SIOCOUTQ, etc. */
+
+#define RXRPC_SET_SECURITY_KEY 14
+#define RXRPC_SET_SECURITY_LEVEL 15
+
+/*
+ * Add a call ID to a control message sequence.
+ */
+#define RXRPC_ADD_CALLID(control, ctrllen, id) \
+do { \
+ unsigned char *__buffer = (control); \
+ unsigned long *__data; \
+ struct cmsghdr *__cmsg; \
+ __cmsg = (struct cmsghdr *)(__buffer + (ctrllen)); \
+ __cmsg->cmsg_len = CMSG_LEN(sizeof(*__data)); \
+ __cmsg->cmsg_level = SOL_RXRPC; \
+ __cmsg->cmsg_type = RXRPC_USER_CALL_ID; \
+ __data = (unsigned long *)CMSG_DATA(__cmsg); \
+ *__data = (id); \
+ (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len); \
+} while (0)
+
+/*
+ * Add an abort instruction to a control message sequence.
+ */
+#define RXRPC_ADD_ABORT(control, ctrllen, abort_code) \
+do { \
+ void *__buffer = (control); \
+ unsigned int *__data; \
+ struct cmsghdr *__cmsg; \
+ __cmsg = (struct cmsghdr *)(__buffer + (ctrllen)); \
+ __cmsg->cmsg_len = CMSG_LEN(sizeof(*__data)); \
+ __cmsg->cmsg_level = SOL_RXRPC; \
+ __cmsg->cmsg_type = RXRPC_ABORT; \
+ __data = (unsigned int *)CMSG_DATA(__cmsg); \
+ *__data = (abort_code); \
+ (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len); \
+} while (0)
+
+/*
+ * Add a security level instruction to a control message sequence.
+ */
+#define RXRPC_ADD_SECURITY_LEVEL(control, ctrllen, sec_level) \
+do { \
+ void *__buffer = (control); \
+ unsigned int *__data; \
+ struct cmsghdr *__cmsg; \
+ __cmsg = (struct cmsghdr *)(__buffer + (ctrllen)); \
+ __cmsg->cmsg_len = CMSG_LEN(sizeof(*__data)); \
+ __cmsg->cmsg_level = SOL_RXRPC; \
+ __cmsg->cmsg_type = RXRPC_SET_SECURITY_LEVEL; \
+ __data = (void *)CMSG_DATA(__cmsg); \
+ *__data = (sec_level); \
+ (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len); \
+} while (0)
+
+/*
+ * Add a total transmit length instruction to a control message sequence.
+ */
+#define RXRPC_ADD_TX_LENGTH(control, ctrllen, length) \
+do { \
+ void *__buffer = (control); \
+ unsigned long long __data = (length); \
+ struct cmsghdr *__cmsg; \
+ __cmsg = (struct cmsghdr *)(__buffer + (ctrllen)); \
+ __cmsg->cmsg_len = CMSG_LEN(sizeof(__data)); \
+ __cmsg->cmsg_level = SOL_RXRPC; \
+ __cmsg->cmsg_type = RXRPC_TX_LENGTH; \
+ memcpy(CMSG_DATA(__cmsg), &__data, sizeof(__data)); \
+ (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len); \
+} while (0)
+
+/*
+ * Add a miscellaneous option to a control message sequence.
+ */
+#define RXRPC_ADD_MISC_OPTION(control, ctrllen, opt) \
+do { \
+ void *__buffer = (control); \
+ struct cmsghdr *__cmsg; \
+ __cmsg = (struct cmsghdr *)(__buffer + (ctrllen)); \
+ __cmsg->cmsg_len = CMSG_LEN(0); \
+ __cmsg->cmsg_level = SOL_RXRPC; \
+ __cmsg->cmsg_type = opt; \
+ (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len); \
+} while (0)
+
+/*
+ * RxRPC tranport definition.
+ */
+class af_rxrpc : public rxrpc::Transport {
+public:
+ std::string name() const;
+
+ ref<rxrpc::Endpoint> new_local_endpoint(const struct sockaddr_rxrpc *local_addr,
+ rxrpc::Security *security);
+
+ ref<rxrpc::Security> new_security(const std::string &key_name,
+ enum rxrpc::security_auth_level level);
+
+ ~af_rxrpc();
+};
+
+/*
+ * RxRPC local endpoint representation.
+ */
+class af_rxrpc_endpoint : public rxrpc::Endpoint {
+public:
+ struct sockaddr_rxrpc local;
+ int fd;
+
+ /* Message reception infrastructure */
+ std::list<rxrpc::Rx_buffer> rx_spare_buffers; /* Spare bufferage for receive */
+ unsigned int rx_nr_buffers; /* Number of buffers on the list */
+#define CONN_MSGS 3
+#define CONN_CMSG_SIZE 128
+#define CONN_NR_IOV 4
+ struct mmsghdr rx_msgs[CONN_MSGS]; /* Message headers */
+ struct sockaddr_rxrpc rx_peer[CONN_MSGS]; /* Mass peer address bufferage */
+ struct iovec rx_iov[CONN_MSGS * CONN_NR_IOV]; /* Mass data bufferage */
+ unsigned char rx_cmsg[CONN_MSGS * CONN_CMSG_SIZE]; /* Mass control message bufferage */
+
+ af_rxrpc_endpoint(const struct sockaddr_rxrpc *local_addr,
+ rxrpc::Security *security);
+ ~af_rxrpc_endpoint()
+ {
+ int i;
+ if (fd != -1)
+ close(fd);
+
+ for (i = 0; i < CONN_MSGS * CONN_NR_IOV; i++)
+ delete [] (unsigned char *)rx_iov[i].iov_base;
+ }
+
+ ref<rxrpc::Call> new_client_call(struct rxrpc::Call_params *params,
+ const char *op_name,
+ size_t size);
+ void receive(bool nowait);
+
+ ref<af_rxrpc_call> get_call_from_anciliary(struct msghdr *msg);
+ void recv_msg(struct msghdr *msg, unsigned int len);
+};
+
+/*
+ * AF_RXRPC security descriptor.
+ */
+class af_rxrpc_security : public rxrpc::Security {
+public:
+ unsigned int min_sec_level; /* RXRPC_SECURITY_* value */
+ af_rxrpc_security(const std::string &key_name,
+ enum rxrpc::security_auth_level level)
+ : Security(key_name, level)
+ {
+ min_sec_level = RXRPC_SECURITY_PLAIN;
+ }
+};
+
+/*
+ * AF_RXRPC call representation.
+ */
+class af_rxrpc_call : public rxrpc::Call {
+public:
+ bool known_to_kernel;
+
+ af_rxrpc_call(rxrpc::Call_params *params, const char *op_name, size_t size);
+ ~af_rxrpc_call();
+
+ void end(int32_t abort_code);
+ void abort(int32_t abort_code);
+ void send_data(rxrpc::Enc_buffer *buf, bool more);
+ void end_split(void);
+
+ void process_anciliary(struct msghdr *msg);
+ void recv_msg(struct msghdr *msg, unsigned int len);
+};
+
+/*
+ * dump the control messages
+ */
+static __attribute__((unused))
+void dump_cmsg(const char *dir, struct msghdr *msg)
+{
+ struct timespec tv;
+ struct cmsghdr *cmsg;
+ unsigned long long tx_len;
+ unsigned long user_id;
+ unsigned char *p;
+ int abort_code;
+ int n;
+
+ for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
+ n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg));
+ p = CMSG_DATA(cmsg);
+
+ debug("%s CMSG: %zu: ", dir, cmsg->cmsg_len);
+
+ switch (cmsg->cmsg_level) {
+ case SOL_SOCKET:
+ switch (cmsg->cmsg_type) {
+ case SO_TIMESTAMPNS_NEW:
+ printf("SO_TIMESTAMPNS_NEW: ");
+ if (n != sizeof(tv))
+ goto dump_data;
+ memcpy(&tv, p, sizeof(tv));
+ printf("%lu.%lu\n", tv.tv_sec, tv.tv_nsec);
+ continue;
+ default:
+ printf("SOCKET_%d", cmsg->cmsg_type);
+ break;
+ }
+ break;
+
+ case SOL_RXRPC:
+ switch (cmsg->cmsg_type) {
+ case RXRPC_USER_CALL_ID:
+ printf("RXRPC_USER_CALL_ID: ");
+ if (n != sizeof(user_id))
+ goto dump_data;
+ memcpy(&user_id, p, sizeof(user_id));
+ printf("%lx\n", user_id);
+ continue;
+
+ case RXRPC_ABORT:
+ printf("RXRPC_ABORT: ");
+ if (n != sizeof(abort_code))
+ goto dump_data;
+ memcpy(&abort_code, p, sizeof(abort_code));
+ printf("%d\n", abort_code);
+ continue;
+
+ case RXRPC_ACK:
+ printf("RXRPC_ACK");
+ if (n != 0)
+ goto dump_data_colon;
+ goto print_nl;
+
+ case RXRPC_NET_ERROR:
+ printf("RXRPC_NET_ERROR: ");
+ if (n != sizeof(abort_code))
+ goto dump_data;
+ memcpy(&abort_code, p, sizeof(abort_code));
+ printf("%s\n", strerror(abort_code));
+ continue;
+
+ case RXRPC_BUSY:
+ printf("RXRPC_BUSY");
+ if (n != 0)
+ goto dump_data_colon;
+ goto print_nl;
+
+ case RXRPC_LOCAL_ERROR:
+ printf("RXRPC_LOCAL_ERROR: ");
+ if (n != sizeof(abort_code))
+ goto dump_data;
+ memcpy(&abort_code, p, sizeof(abort_code));
+ printf("%s\n", strerror(abort_code));
+ continue;
+
+ case RXRPC_UPGRADE_SERVICE:
+ printf("RXRPC_UPGRADE_SERVICE\n");
+ continue;
+ case RXRPC_TX_LENGTH:
+ printf("RXRPC_TX_LENGTH: ");
+ if (n != sizeof(tx_len))
+ goto dump_data;
+ memcpy(&tx_len, p, sizeof(tx_len));
+ printf("%llu\n", tx_len);
+ continue;
+ case RXRPC_SET_CALL_TIMEOUT:
+ printf("RXRPC_SET_CALL_TIMEOUT\n");
+ continue;
+ default:
+ printf("RXRPC_%d", cmsg->cmsg_type);
+ break;
+ }
+ break;
+
+ default:
+ printf("l=%d t=%d", cmsg->cmsg_level, cmsg->cmsg_type);
+ break;
+ }
+
+
+ dump_data_colon:
+ printf(": ");
+ dump_data:
+ printf("{");
+ for (; n > 0; n--, p++)
+ printf("%02x", *p);
+
+ print_nl:
+ printf("}\n");
+ }
+}
+
+std::string af_rxrpc::name() const
+{
+ return "AF_RXRPC";
+}
+
+/*
+ * Construct a local endpoint.
+ */
+af_rxrpc_endpoint::af_rxrpc_endpoint(const struct sockaddr_rxrpc *local_addr,
+ rxrpc::Security *security)
+ : Endpoint(local_addr)
+{
+ int opt, i;
+
+ fd = -1;
+ rx_nr_buffers = 0;
+ memset(rx_msgs, 0, sizeof(rx_msgs));
+ memset(rx_peer, 0, sizeof(rx_peer));
+ memset(rx_iov, 0, sizeof(rx_iov));
+ memset(rx_cmsg, 0, sizeof(rx_cmsg));
+
+ /* Set up the message descriptors */
+ for (i = 0; i < CONN_MSGS * CONN_NR_IOV; i++)
+ rx_iov[i].iov_len = RXGEN_BUFFER_SIZE;
+
+ for (i = 0; i < CONN_MSGS; i++) {
+ struct mmsghdr *m = &rx_msgs[i];
+
+ m->msg_hdr.msg_iov = rx_iov + i * CONN_NR_IOV;
+ m->msg_hdr.msg_iovlen = CONN_NR_IOV;
+ m->msg_hdr.msg_name = &rx_peer[i];
+ m->msg_hdr.msg_namelen = sizeof(rx_peer[i]);
+ m->msg_hdr.msg_control = rx_cmsg;
+ m->msg_hdr.msg_controllen = CONN_CMSG_SIZE;
+ m->msg_hdr.msg_flags = 0;
+ }
+
+ /* Open up a socket for talking to the AF_RXRPC module */
+ fd = socket(AF_RXRPC, SOCK_DGRAM, PF_INET6);
+ if (fd < 0)
+ throw rxrpc::syserror("socket()");
+
+ try {
+ opt = 1;
+ if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMPNS_NEW, (char *)&opt, sizeof(opt)) == -1)
+ throw rxrpc::syserror("setsockopt(SO_TIMESTAMPNS_NEW)");
+
+ if (address.srx_family &&
+ bind(fd, (struct sockaddr *)&address, sizeof(address)) == -1)
+ throw rxrpc::syserror("bind");
+
+ if (security) {
+ af_rxrpc_security *sec = dynamic_cast<af_rxrpc_security *>(security);
+ if (sec->level != rxrpc::security_no_auth) {
+ debug("OPT RXRPC_SECURITY_KEY %s\n", security->key_name.c_str());
+ if (setsockopt(fd, SOL_RXRPC, RXRPC_SECURITY_KEY,
+ security->key_name.c_str(),
+ security->key_name.size()) < 0)
+ throw rxrpc::syserror("setsockopt(RXRPC_SECURITY_KEY)");
+
+ debug("OPT RXRPC_MIN_SECURITY_LEVEL %u\n", sec->min_sec_level);
+ if (setsockopt(fd, SOL_RXRPC, RXRPC_MIN_SECURITY_LEVEL,
+ &sec->min_sec_level,
+ sizeof(sec->min_sec_level)) < 0)
+ throw rxrpc::syserror("setsockopt(RXRPC_MIN_SECURITY_LEVEL)");
+ }
+ }
+ } catch (...) {
+ close(fd);
+ throw;
+ }
+}
+
+/*
+ * Initialise the transport module
+ */
+ref<rxrpc::Endpoint> af_rxrpc::new_local_endpoint(const struct sockaddr_rxrpc *local_addr,
+ rxrpc::Security *security)
+{
+ ref<af_rxrpc_endpoint> endpoint;
+
+ if (local_addr &&
+ local_addr->transport_len) {
+ if (local_addr->transport_len > sizeof(local_addr->transport))
+ throw std::invalid_argument("Transport address too long");
+
+ switch (endpoint->address.transport.family) {
+ case AF_INET:
+ case AF_INET6:
+ break;
+ default:
+ throw std::invalid_argument("Unsupported transport type\n");
+ }
+ }
+
+ return new af_rxrpc_endpoint(local_addr, security);
+}
+
+/*
+ * Set up a security descriptor. Note that the key name can be NULL (for
+ * instance if -noauth was passed).
+ */
+ref<rxrpc::Security> af_rxrpc::new_security(const std::string &key_name,
+ enum rxrpc::security_auth_level level)
+{
+ ref<af_rxrpc_security> security;
+
+ security = new af_rxrpc_security(key_name, level);
+
+ switch (level) {
+ case rxrpc::security_no_auth:
+ break;
+ case rxrpc::security_local_auth:
+ throw std::invalid_argument("Local auth not supported by AF_RXRPC transport");
+ case rxrpc::security_unset:
+ case rxrpc::security_clear:
+ security->min_sec_level = RXRPC_SECURITY_PLAIN;
+ if (security->key_name.size() == 0)
+ throw std::invalid_argument("Unspecified key name");
+ break;
+ case rxrpc::security_integrity_only:
+ security->min_sec_level = RXRPC_SECURITY_AUTH;
+ if (security->key_name.size() == 0)
+ throw std::invalid_argument("Unspecified key name");
+ break;
+ case rxrpc::security_encrypt:
+ security->min_sec_level = RXRPC_SECURITY_ENCRYPT;
+ if (security->key_name.size() == 0)
+ throw std::invalid_argument("Unspecified key name");
+ break;
+ }
+
+ return security;
+}
+
+/*
+ * Construct a client call.
+ */
+af_rxrpc_call::af_rxrpc_call(rxrpc::Call_params *params, const char *op_name, size_t size)
+ : Call(params, op_name, size)
+{
+ known_to_kernel = false;
+}
+
+/*
+ * Allocate a call handle for a new call.
+ */
+ref <rxrpc::Call> af_rxrpc_endpoint::new_client_call(rxrpc::Call_params *params,
+ const char *op_name,
+ size_t size)
+{
+ return new af_rxrpc_call(params, op_name, size);
+}
+
+/*
+ * Dump data from an output buffer.
+ */
+static void dump_enc_buffers(rxrpc::Enc_buffer *&buf)
+{
+ unsigned int pos = 0, ioc, i;
+ bool nl = false;
+
+ for (ioc = 0; ioc < buf->ioc; ioc++) {
+ for (i = 0; i < buf->msg_iov[ioc].iov_len; i++) {
+ if (!(pos & 3)) {
+ if (!(pos & 31))
+ printf("%06x: ", pos);
+ else
+ printf(" ");
+ }
+
+ printf("%02x", ((unsigned char *)buf->msg_iov[ioc].iov_base)[i]);
+ pos++;
+ nl = false;
+ if ((pos & 31) == 0) {
+ printf("\n");
+ nl = true;
+ }
+ }
+ }
+
+ if (!nl)
+ printf("\n");
+}
+
+/*
+ * Send buffered data.
+ */
+void af_rxrpc_call::send_data(rxrpc::Enc_buffer *buf, bool more)
+{
+ af_rxrpc_endpoint *endpoint = this->endpoint.cast<af_rxrpc_endpoint>();
+ struct msghdr msg;
+ unsigned char control[128];
+ unsigned int flags = 0, tmp, i;
+ socklen_t olen;
+ ssize_t slen;
+ size_t ctrllen;
+ char abuf[512];
+
+ if (rxrpc::debug_transport)
+ printf("\n");
+
+ /* Switch into encode state */
+ switch (state) {
+ case call_cl_not_started:
+ case call_sv_processing:
+ state = (enum call_state)(state + 1);
+ break;
+ case call_cl_encoding_params:
+ case call_sv_encoding_response:
+ break;
+ default:
+ fprintf(stderr, "RxRPC: Send in bad call state (%d)\n", state);
+ ::abort();
+ }
+
+ /* Request an operation */
+ ctrllen = 0;
+ RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)(rxrpc::refcount *)this);
+ if (exclusive)
+ RXRPC_ADD_MISC_OPTION(control, ctrllen, RXRPC_EXCLUSIVE_CALL);
+ if (upgrade_service)
+ RXRPC_ADD_MISC_OPTION(control, ctrllen, RXRPC_UPGRADE_SERVICE);
+ if (total_tx_size) {
+ debug("total Tx size %llu\n", total_tx_size);
+ RXRPC_ADD_TX_LENGTH(control, ctrllen, total_tx_size);
+ }
+
+ msg.msg_name = &peer;
+ msg.msg_namelen = peer_len;
+ msg.msg_iov = buf ? buf->msg_iov : NULL;
+ msg.msg_iovlen = buf ? buf->ioc : 0;
+ msg.msg_control = control;
+ msg.msg_controllen = ctrllen;
+ msg.msg_flags = 0;
+
+ if (more)
+ flags |= MSG_MORE;
+
+ switch (peer.transport.family) {
+ case AF_INET:
+ inet_ntop(peer.transport.family, &peer.transport.sin.sin_addr,
+ abuf, sizeof(abuf));
+ debug("Tx %u Addr %s:%u\n",
+ peer.srx_service, abuf, ntohs(peer.transport.sin.sin_port));
+ break;
+ case AF_INET6:
+ inet_ntop(peer.transport.family, &peer.transport.sin6.sin6_addr,
+ abuf, sizeof(abuf));
+ debug("Tx %u Addr [%s]:%u\n",
+ peer.srx_service, abuf, ntohs(peer.transport.sin6.sin6_port));
+ break;
+ }
+
+ if (rxrpc::debug_transport) {
+ dump_cmsg("Tx", &msg);
+ for (i = 0; i < msg.msg_iovlen; i++)
+ debug("Tx IOV[%02u] %04zu %p\n",
+ i, msg.msg_iov[i].iov_len, msg.msg_iov[i].iov_base);
+ }
+ if (rxrpc::debug_data)
+ dump_enc_buffers(buf);
+
+ olen = sizeof(tmp);
+ getsockopt(endpoint->fd, SOL_SOCKET, SO_ERROR, (char *)&tmp, &olen);
+
+ /* Send the data */
+ slen = sendmsg(endpoint->fd, &msg, flags);
+ if (slen < 0) {
+ debug("SENDMSG: %m\n");
+ throw rxrpc::syserror("sendmsg");
+ }
+
+ debug("SENDMSG: %zd%s\n", slen, more ? " [more]" : "");
+ known_to_kernel = true;
+
+ if (!more) {
+ state = (enum call_state)(state + 1);
+ if (state == call_cl_encoding_params ||
+ state == call_sv_encoding_response)
+ state = (enum call_state)(state + 1);
+ }
+}
+
+/*
+ * End a split phase.
+ */
+void af_rxrpc_call::end_split(void)
+{
+ switch (state) {
+ case call_cl_encoding_params: {
+ send_data(NULL, false);
+ break;
+ }
+
+ case call_cl_receiving_response:
+ dec_func = rxrpc::receive_reply;
+ dec_func(this);
+ break;
+
+ default:
+ fprintf(stderr, "rxrpc::end_split() called in state %u\n",
+ state);
+ return;
+ }
+}
+
+/*
+ * Determine the call a received message applies to by examining the anciliary
+ * data from recvmsg().
+ */
+ref<af_rxrpc_call> af_rxrpc_endpoint::get_call_from_anciliary(struct msghdr *msg)
+{
+ ref<af_rxrpc_call> call;
+ struct cmsghdr *cmsg;
+ unsigned long tmp;
+
+ if (rxrpc::debug_transport)
+ dump_cmsg("Rx", msg);
+
+ for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
+ unsigned char *p = CMSG_DATA(cmsg);
+ int n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg));
+
+ if (cmsg->cmsg_level != SOL_RXRPC)
+ continue;
+
+ switch (cmsg->cmsg_type) {
+ case RXRPC_USER_CALL_ID:
+ if (n != sizeof(tmp)) {
+ fprintf(stderr, "User call ID wrong size\n");
+ abort();
+ }
+
+ memcpy(&tmp, p, sizeof(tmp));
+ call.assign_cast((rxrpc::refcount *)tmp);
+ return call;
+ }
+ }
+
+ throw std::runtime_error("recvmsg() didn't include a RXRPC_USER_CALL_ID cmsg");
+}
+
+/*
+ * Process the metadata given to us by recvmsg().
+ */
+void af_rxrpc_call::process_anciliary(struct msghdr *msg)
+{
+ struct sockaddr_rxrpc *srx = (struct sockaddr_rxrpc *)msg->msg_name;
+ struct cmsghdr *cmsg;
+
+ if (service_id != srx->srx_service)
+ debug("Upgraded service to %u\n", srx->srx_service);
+ service_id = srx->srx_service; /* Changes if service upgrade */
+
+ if ((msg->msg_flags & MSG_EOR) && state != call_completed) {
+ debug("Rx EOR\n");
+ known_to_kernel = false;
+ state = call_completed;
+ completed = call_success;
+ }
+
+ for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
+ unsigned char *p = CMSG_DATA(cmsg);
+ int n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg));
+
+ if (cmsg->cmsg_level == SOL_SOCKET) {
+ switch (cmsg->cmsg_type) {
+ case SO_TIMESTAMPNS_NEW:
+ if (have_rx_timestamp)
+ continue;
+
+ if (n != sizeof(rx_timestamp)) {
+ fprintf(stderr, "SO_TIMESTAMPNS wrong size\n");
+ std::abort();
+ }
+
+ memcpy(&rx_timestamp, p, n);
+ have_rx_timestamp = true;
+ debug("Rx tstamp\n");
+ break;
+ }
+ }
+
+ if (cmsg->cmsg_level != SOL_RXRPC)
+ continue;
+
+ switch (cmsg->cmsg_type) {
+ case RXRPC_USER_CALL_ID:
+ break;
+
+ case RXRPC_ABORT:
+ if (n == 4)
+ memcpy(&abort_code, p, 4);
+ error = ECONNABORTED;
+ state = call_completed;
+ completed = call_remotely_aborted;
+ break;
+
+ case RXRPC_NET_ERROR:
+ if (n != sizeof(error))
+ throw std::runtime_error("Can't parse RXRPC_NET_ERROR cmsg");
+ memcpy(&error, p, 4);
+ state = call_completed;
+ completed = call_network_error;
+ break;
+
+ case RXRPC_LOCAL_ERROR:
+ if (n != sizeof(error))
+ throw std::runtime_error("Can't parse RXRPC_LOCAL_ERROR cmsg");
+ memcpy(&error, p, 4);
+ state = call_completed;
+ completed = call_local_error;
+ break;
+
+ case RXRPC_BUSY:
+ error = ECONNREFUSED;
+ state = call_completed;
+ completed = call_rejected_busy;
+ break;
+
+ case RXRPC_ACK:
+ if (state != call_sv_waiting_for_final_ack) {
+ fprintf(stderr, "RxRPC: Recv-Ack in bad call state (%d)\n",
+ state);
+ std::abort();
+ }
+ state = call_completed;
+ completed = call_success;
+ break;
+
+ default:
+ fprintf(stderr, "Unexpected CMSG type %x\n", cmsg->cmsg_type);
+ break;
+ }
+ }
+
+ if (state == call_completed) {
+ if (completion_func) {
+ completion_func(this);
+ completion_func = NULL;
+ } else {
+ switch (completed) {
+ case call_remotely_aborted:
+ throw_abort_func(abort_code, op_name);
+ case call_network_error:
+ throw rxrpc::network_error(error);
+ case call_local_error:
+ case call_rejected_busy:
+ throw rxrpc::syserror(error);
+ default:
+ break;
+ }
+ }
+ }
+
+ debug("<--af_rxrpc_call::process_anciliary()\n");
+}
+
+/*
+ * Process a message from a call.
+ */
+void af_rxrpc_call::recv_msg(struct msghdr *msg, unsigned int len)
+{
+ af_rxrpc_endpoint *endpoint = this->endpoint.cast<af_rxrpc_endpoint>();
+ struct iovec *iov = msg->msg_iov;
+ unsigned int tmp;
+ int i;
+
+ process_anciliary(msg);
+
+ if (len > CONN_NR_IOV * RXGEN_BUFFER_SIZE) {
+ fprintf(stderr, "Too much data returned\n");
+ ::abort();
+ }
+
+ if (rxrpc::debug_data) {
+ tmp = len;
+ for (i = 0; i < CONN_NR_IOV && tmp > 0; i++) {
+ unsigned int seg = (tmp <= iov[i].iov_len) ? tmp : iov[i].iov_len;
+ bool nl = false;
+ printf("Rx IOV[%u] %zu %u %p\n", i, iov[i].iov_len, seg, iov[i].iov_base);
+
+ unsigned int j, s = seg; // <= 32 ? seg : 32;
+ for (j = 0; j < s; j++) {
+ if (!(j & 3)) {
+ if (!(j & 31))
+ printf("%06x: ", j);
+ else
+ printf(" ");
+ }
+ printf("%02x", ((unsigned char *)iov[i].iov_base)[j]);
+ nl = false;
+ if ((j & 31) == 31) {
+ printf("\n");
+ nl = true;
+ }
+ }
+ if (!nl)
+ printf("\n");
+ tmp -= seg;
+ }
+ }
+
+ /* Update the buffers with the amount of data stored. */
+ for (i = 0; i < CONN_NR_IOV && len > 0; i++) {
+ rxrpc::Rx_buffer &b = endpoint->rx_spare_buffers.front();
+ unsigned int n;
+
+ b.index = dec_index++;
+
+ n = RXGEN_BUFFER_SIZE;
+ if (n > len)
+ n = len;
+ debug("BUF[%u] +%-4u %zu {%u}\n",
+ b.index, n, dec_amount + n, endpoint->rx_nr_buffers);
+ b.len = n;
+ b.buf = (unsigned char *)iov[i].iov_base;
+ iov[i].iov_base = NULL;
+
+ endpoint->rx_nr_buffers--;
+ std::list<rxrpc::Rx_buffer>::iterator to = endpoint->rx_spare_buffers.begin();
+ to++;
+ dec_buffers.splice(dec_buffers.end(),
+ endpoint->rx_spare_buffers,
+ endpoint->rx_spare_buffers.begin(),
+ to);
+ dec_amount += n;
+ len -= n;
+ }
+
+ dec_func(this);
+}
+
+/*
+ * Process a message.
+ */
+void af_rxrpc_endpoint::recv_msg(struct msghdr *msg, unsigned int len)
+{
+ af_rxrpc_call *call = get_call_from_anciliary(msg);
+
+ call->recv_msg(msg, len);
+}
+
+/*
+ * Receive data from a variety of calls in one go.
+ */
+void af_rxrpc_endpoint::receive(bool nowait)
+{
+ struct iovec *iov = rx_iov;
+ int count, i, j;
+
+ if (rxrpc::debug_transport)
+ printf("\n");
+
+ /* Tamp down the buffers to make sure we cycle through the memory */
+ j = 0;
+ for (i = 0; i < CONN_MSGS * CONN_NR_IOV; i++) {
+ if (!iov[i].iov_base)
+ continue;
+ iov[j++].iov_base = iov[i].iov_base;
+ }
+
+ for (; j < CONN_MSGS * CONN_NR_IOV; j++)
+ iov[j].iov_base = new unsigned char[RXGEN_BUFFER_SIZE];
+
+ /* Make sure we have sufficient buffer descriptors */
+ while (rx_nr_buffers < CONN_MSGS * CONN_NR_IOV) {
+ rx_spare_buffers.emplace_back();
+ rx_nr_buffers++;
+ }
+
+ /* Receive messages. The descriptor array is already set up and gets
+ * reset as messages are processed.
+ */
+ count = recvmmsg(fd, rx_msgs, CONN_MSGS,
+ nowait ? MSG_DONTWAIT : MSG_WAITFORONE,
+ NULL);
+ debug("RECVMMSG: %d\n", count);
+ if (count == -1) {
+ if (errno != ENODATA)
+ throw rxrpc::syserror("recvmmsg");
+ return;
+ }
+
+ for (i = 0; i < count; i++) {
+ struct mmsghdr *m = &rx_msgs[i];
+ struct msghdr *msg = &m->msg_hdr;
+
+ debug("Rx DATA: %d [fl:%x]\n", m->msg_len, msg->msg_flags);
+
+ recv_msg(msg, m->msg_len);
+
+ /* Reset the message descriptor. */
+ m->msg_hdr.msg_namelen = sizeof(rx_peer[i]);
+ m->msg_hdr.msg_controllen = CONN_CMSG_SIZE;
+ m->msg_hdr.msg_flags = 0;
+ }
+}
+
+/*
+ * Abort a call.
+ */
+void af_rxrpc_call::abort(int32_t abort_code)
+{
+ af_rxrpc_endpoint *endpoint = this->endpoint.cast<af_rxrpc_endpoint>();
+ struct msghdr msg;
+ size_t ctrllen;
+ unsigned char control[128];
+
+ if (known_to_kernel) {
+ memset(control, 0, sizeof(control));
+ ctrllen = 0;
+ RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)(rxrpc::refcount *)this);
+ RXRPC_ADD_ABORT(control, ctrllen, abort_code);
+
+ msg.msg_name = &peer;
+ msg.msg_namelen = peer_len;
+ msg.msg_iov = NULL;
+ msg.msg_iovlen = 0;
+ msg.msg_control = control;
+ msg.msg_controllen = ctrllen;
+ msg.msg_flags = 0;
+
+ sendmsg(endpoint->fd, &msg, 0);
+ state = call_completed;
+ completed = call_locally_aborted;
+ }
+}
+
+/*
+ * End a call, aborting it if necessary.
+ */
+void af_rxrpc_call::end(int32_t abort_code)
+{
+ if (state != call_completed)
+ abort(abort_code);
+}
+
+/*
+ * Destroy a call.
+ */
+af_rxrpc_call::~af_rxrpc_call()
+{
+ if (known_to_kernel) {
+ fprintf(stderr, "Call still known to kernel\n");
+ end(RX_USER_ABORT);
+ }
+}
+
+af_rxrpc::~af_rxrpc()
+{
+}
+
+ref<rxrpc::Transport> create_af_rxrpc_transport()
+{
+ return new af_rxrpc();
+}
--- /dev/null
+/* Core definitions.
+ *
+ * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#ifndef RXRPC_HH
+#define RXRPC_HH
+
+#include <climits>
+#include <exception>
+#include <stdexcept>
+#include <string>
+#include <vector>
+#include <list>
+#include <cstring>
+#include <cerrno>
+#include <system_error>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include <arpa/inet.h>
+#include <uuid/uuid.h>
+#include <linux/rxrpc.h>
+#include "pointer.H"
+
+namespace rxrpc {
+
+class Call;
+
+extern bool debug_buffers;
+extern bool debug_core;
+extern bool debug_data;
+extern bool debug_ops;
+extern bool debug_transport;
+extern bool debug_xdr;
+
+typedef unsigned int net_xdr_t;
+
+struct xdr_u64 { net_xdr_t hi, lo; };
+
+enum security_auth_level {
+ security_unset,
+ security_no_auth,
+ security_local_auth,
+ security_clear,
+ security_integrity_only,
+ security_encrypt,
+};
+
+enum error_source {
+ error_none,
+ error_invalid_argument,
+ error_remote_abort,
+ error_peer_busy,
+ error_from_system,
+ error_from_network,
+ error_from_parameters,
+};
+
+/*
+ * Local transport endpoint.
+ */
+class Endpoint : public refcount {
+public:
+ struct sockaddr_rxrpc address;
+
+ Endpoint(const struct sockaddr_rxrpc *local_addr)
+ {
+ if (local_addr)
+ memcpy(&address, local_addr, sizeof(address));
+ else
+ memset(&address, 0, sizeof(address));
+ }
+
+ virtual ~Endpoint() = 0;
+ virtual void receive(bool nowait=false) = 0;
+ virtual ref<Call> new_client_call(struct Call_params *params,
+ const char *op_name,
+ size_t size) = 0;
+};
+
+/*
+ * Security details to use.
+ */
+class Security : public refcount {
+public:
+ std::string key_name; /* Name of key to use */
+ enum security_auth_level level;
+ Security(const std::string &_key_name,
+ enum rxrpc::security_auth_level _level)
+ : key_name(_key_name), level(_level)
+ {}
+ virtual ~Security();
+};
+
+/*
+ * Call parameters.
+ */
+struct Call_params {
+ ref<Endpoint> endpoint;
+ ref<Security> security;
+ struct sockaddr_rxrpc peer;
+ socklen_t peer_len;
+ bool exclusive;
+ bool upgrade_service;
+ unsigned short service_id_used;
+
+ Call_params()
+ {
+ memset(&peer, 0, sizeof(peer));
+ peer_len = 0;
+ exclusive = false;
+ upgrade_service = false;
+ service_id_used = 0;
+ }
+};
+
+class Resv {
+public:
+ unsigned int max_ioc; /* Size of msg_iov[] */
+ bool flat; /* T to do it all in one iovec */
+ size_t alloc; /* Size of buffer allocated */
+ size_t additional; /* Size of additional content */
+};
+
+/*
+ * RxRPC encoded data buffer.
+ */
+class Enc_buffer : public refcount {
+public:
+ net_xdr_t *enc_buf; /* Encoding buffer */
+ net_xdr_t *enc_buf_p; /* Running pointer into encoding buffer */
+ net_xdr_t *enc_buf_end; /* Encoding buffer limit */
+ struct iovec *msg_iov; /* For sendmsg()'s msghdr */
+ unsigned int ioc; /* Occupancy of msg_iov[] */
+ bool flat; /* T to do it all in one iovec */
+ size_t size; /* Size of content, including additional iovecs */
+ Enc_buffer()
+ {
+ enc_buf = NULL;
+ enc_buf_p = NULL;
+ enc_buf_end = NULL;
+ msg_iov = NULL;
+ ioc = 0;
+ flat = false;
+ size = 0;
+ }
+ ~Enc_buffer();
+};
+
+/*
+ * RxRPC received data buffer.
+ */
+class Rx_buffer {
+public:
+#define RXGEN_BUFFER_SIZE 1024
+ unsigned char *buf;
+ unsigned short len; /* Amount of data in buffer */
+ unsigned short used; /* How much data used */
+ unsigned int index; /* Index in buffer list */
+ bool no_delete; /* Don't delete the buffer */
+
+ Rx_buffer()
+ {
+ buf = NULL;
+ len = 0;
+ used = 0;
+ index = 0;
+ no_delete = false;
+ }
+ ~Rx_buffer()
+ {
+ if (!no_delete)
+ delete [] buf;
+ buf = NULL;
+ }
+};
+
+/*
+ * Received data queue.
+ */
+class Rx_queue {
+public:
+ const char *op_name; /* Name of the RPC operation */
+ bool client_side; /* True if client side of call */
+ unsigned int dec_index;
+ std::list<Rx_buffer> dec_buffers; /* List of received buffers */
+ size_t dec_amount; /* Amount of data in dec_buffers */
+
+ Rx_queue(const char *opname = NULL);
+ ~Rx_queue();
+ static void read_copy(void *buffer, size_t offset, const void *data, size_t len);
+ void read(void *buffer, size_t buffer_size,
+ void (*copy)(void *buffer, size_t offset, const void *data, size_t len)
+ = read_copy);
+ virtual void overrun();
+};
+
+/*
+ * RxRPC call.
+ */
+class Call : public refcount, public Rx_queue {
+public:
+ enum call_state {
+ call_cl_not_started,
+ call_cl_encoding_params,
+ call_cl_receiving_response,
+
+ call_sv_not_started,
+ call_sv_receiving_params,
+ call_sv_wait_for_no_MSG_MORE,
+ call_sv_processing,
+ call_sv_encoding_response_split,
+ call_sv_encoding_response,
+ call_sv_response_encoded,
+ call_sv_waiting_for_final_ack,
+ call_completed,
+ };
+
+ enum call_completion {
+ call_incomplete,
+ call_success,
+ call_remotely_aborted,
+ call_locally_aborted,
+ call_network_error,
+ call_local_error,
+ call_rejected_busy,
+ };
+
+ ref<Endpoint> endpoint;
+ struct sockaddr_rxrpc peer;
+ socklen_t peer_len;
+ unsigned short service_id;
+ enum call_state state;
+ enum call_completion completed;
+ uint32_t abort_code;
+ unsigned int error;
+
+ bool split; /* True if split-mode call */
+ bool exclusive;
+ bool upgrade_service;
+ bool have_rx_timestamp;
+
+ struct timespec rx_timestamp; /* Timestamp of first received packet */
+
+ void (*completion_func)(Call *);
+ void (*throw_abort_func)(int ac, const char *op);
+
+ /* Transmission phase state */
+ unsigned long long total_tx_size; /* Total expected transmission size */
+ uint32_t encode_error;
+
+ /* Receive phase state */
+ void (*dec_func)(Call *);
+
+ Call();
+ Call(struct Call_params *params, const char *op_name, size_t size);
+ virtual ~Call();
+
+ virtual void send_data(Enc_buffer *buf, bool more=false) = 0;
+ virtual void end(int32_t abort_code=0) = 0;
+ virtual void abort(int32_t abort_code) = 0;
+ virtual void end_split(void) = 0;
+
+ void dec_underrun(size_t);
+ void request_received();
+};
+
+extern void find_transport(void);
+extern ref<Endpoint> new_local_endpoint(const struct sockaddr_rxrpc *local_sa = NULL,
+ rxrpc::Security *security = NULL);
+extern ref<Security> new_security(const std::string &key_name,
+ enum security_auth_level level);
+
+extern ref<Enc_buffer> alloc_enc_buffer(Resv &resv);
+extern ref<Call> make_call(Call_params *z_params,
+ const char *op_name,
+ Enc_buffer *z_request);
+extern void receive_reply(Call *call);
+
+/*
+ * UUID.
+ */
+struct Uuid {
+ uuid_t uuid;
+ Uuid() { memset(uuid, 0, sizeof(uuid)); }
+};
+
+/*
+ * Opaque data.
+ */
+struct Opaque {
+ size_t buffer_size;
+ void *buffer;
+ bool del_buffer;
+
+ inline Opaque(void)
+ {
+ buffer_size = 0;
+ buffer = NULL;
+ del_buffer = false;
+ }
+ inline ~Opaque(void)
+ {
+ if (del_buffer)
+ free(buffer);
+ }
+ inline size_t size(void) const { return buffer_size; }
+ inline void reserve(size_t size)
+ {
+ if (buffer && del_buffer)
+ free(buffer);
+ buffer = malloc(size + 1);
+ ((unsigned char *)buffer)[size] = 0;
+ del_buffer = true;
+ }
+};
+
+/*
+ * XDR encoding size calculation routines
+ */
+static inline void reserve(Resv &resv)
+{
+ resv.alloc += 4;
+}
+
+static inline void reserve_a32(Resv &resv, unsigned int n)
+{
+ resv.alloc += 4 * n;
+}
+
+static inline void reserve_u64(Resv &resv)
+{
+ resv.alloc += 8;
+}
+
+static inline void reserve_a64(Resv &resv, unsigned int n)
+{
+ resv.alloc += 8 * n;
+}
+
+static inline size_t xdr_round_up(unsigned int size)
+{
+ return (size + sizeof(net_xdr_t) - 1) & ~(sizeof(net_xdr_t) - 1);
+}
+
+static inline bool reserve_opaque_inline(bool flat, unsigned int size, unsigned int max)
+{
+ return flat || (max <= 1024 && size <= 1024);
+}
+
+static inline void reserve_opaque(Resv &resv, const Opaque &data,
+ unsigned int max)
+{
+ size_t size = data.size();
+ size_t rs = xdr_round_up(size);
+
+ reserve(resv);
+ if (reserve_opaque_inline(resv.flat, size, max)) {
+ resv.alloc += rs;
+ } else {
+ if ((size & 3) == 0) {
+ resv.additional += rs;
+ } else {
+ resv.alloc += 4;
+ resv.additional += rs - 4;
+ }
+ resv.max_ioc += 2;
+ }
+}
+
+static inline void reserve_string(Resv &resv, const std::string &s, unsigned int max)
+{
+ Opaque op;
+
+ op.buffer_size = s.size(),
+ reserve_opaque(resv, op, max);
+}
+
+static inline void reserve_opr_uuid(Resv &resv, const Uuid &uuid)
+{
+ resv.alloc += 16;
+}
+
+/*
+ * XDR encoding routines
+ */
+extern void enc_data(Enc_buffer *buf, const void *data,
+ unsigned int len, unsigned int max);
+
+static inline void enc(Enc_buffer *z_buf, uint32_t val)
+{
+ *z_buf->enc_buf_p++ = htonl(val);
+}
+
+static inline void enc_u64(Enc_buffer *z_buf, uint64_t x)
+{
+ enc(z_buf, x >> 32);
+ enc(z_buf, x);
+}
+
+static inline void enc_string(Enc_buffer *buf, const std::string &s, unsigned int max)
+{
+ enc_data(buf, s.c_str(), s.size(), max);
+}
+
+static inline void enc_opaque(Enc_buffer *buf, const Opaque &data, unsigned int max)
+{
+ enc_data(buf, data.buffer, data.size(), max);
+}
+
+static inline void enc_block(Enc_buffer *buf, const void *p, size_t s)
+{
+ memcpy(buf->enc_buf_p, p, s);
+ buf->enc_buf_p += s / 4;
+}
+
+static inline void enc_uuid(Enc_buffer *buf, const Uuid &u)
+{
+ enc_block(buf, u.uuid, sizeof(u.uuid));
+}
+
+static inline void seal_buffer(Enc_buffer *buf)
+{
+ unsigned int len;
+
+ /* call->ioc points to the iovec being filled, so we need to advance it
+ * if that has anything in it.
+ */
+ len = (unsigned long)buf->enc_buf_p - (unsigned long)buf->msg_iov[buf->ioc].iov_base;
+ if (len) {
+ buf->msg_iov[buf->ioc].iov_len = len;
+ buf->ioc++;
+ }
+}
+
+/*
+ * XDR decoding routines
+ */
+extern uint32_t dec(Rx_queue &rxq);
+extern uint32_t dec(Rx_queue &rxq, size_t limit);
+extern void dec_string(Rx_queue &rxq, std::string &, size_t max = UINT_MAX);
+extern void dec_opaque(Rx_queue &rxq, Opaque &, size_t max = UINT_MAX);
+extern void *dec_blob(Rx_queue &rxq, size_t size);
+
+static inline uint64_t dec_u64(Rx_queue &rxq)
+{
+ uint64_t x;
+ x = (uint64_t)dec(rxq) << 32;
+ x |= (uint64_t)dec(rxq);
+ return x;
+}
+
+/*
+ * Error handling.
+ */
+struct nomem : std::exception {
+};
+
+class syserror : public std::system_error {
+protected:
+ syserror() {}
+public:
+ syserror(const char *s)
+ : system_error(std::error_code(errno, std::generic_category()), s)
+ {
+ }
+ syserror(const std::string &s)
+ : system_error(std::error_code(errno, std::generic_category()), s)
+ {
+ }
+ syserror(int err, const char *s)
+ : system_error(std::error_code(err, std::generic_category()), s)
+ {
+ }
+ syserror(int err, const std::string &s)
+ : system_error(std::error_code(err, std::generic_category()), s)
+ {
+ }
+ syserror(int err)
+ : system_error(std::error_code(err, std::generic_category()))
+ {
+ }
+};
+
+class network_error : public syserror {
+public:
+ __attribute__((format(printf,2,3)))
+ network_error(const char *fmt, ...)
+ {
+ }
+ network_error(int err)
+ : syserror(err)
+ {
+ }
+};
+
+class rx_abort : public std::runtime_error {
+protected:
+ int abort_code;
+ const char *abort_name;
+ const char *op_name;
+
+ rx_abort(int abort, const char *name, const char *desc, const char *op);
+public:
+ int get_abort_code() const { return abort_code; }
+};
+
+class unknown_abort : public rx_abort {
+public:
+ unknown_abort(int abort_code, const char *op = NULL)
+ : rx_abort(abort_code, "UnknownAbort", "", op) {}
+};
+
+struct AbortRx : rx_abort {
+ AbortRx(int ac, const char *sym, const char *desc, const char *op)
+ : rx_abort(ac, sym, desc, op) {}
+};
+struct AbortRX_CALL_DEAD : AbortRx {
+ AbortRX_CALL_DEAD(const char *op = NULL)
+ : AbortRx(RX_CALL_DEAD, "RX_CALL_DEAD", "", op) {}
+};
+struct AbortRX_INVALID_OPERATION : AbortRx {
+ AbortRX_INVALID_OPERATION(const char *op = NULL)
+ : AbortRx(RX_INVALID_OPERATION, "RX_INVALID_OPERATION", "", op) {}
+};
+struct AbortRX_CALL_TIMEOUT : AbortRx {
+ AbortRX_CALL_TIMEOUT(const char *op = NULL)
+ : AbortRx(RX_CALL_TIMEOUT, "RX_CALL_TIMEOUT", "", op) {}
+};
+struct AbortRX_EOF : AbortRx {
+ AbortRX_EOF(const char *op = NULL)
+ : AbortRx(RX_EOF, "RX_EOF", "", op) {}
+};
+struct AbortRX_PROTOCOL_ERROR : AbortRx {
+ AbortRX_PROTOCOL_ERROR(const char *op = NULL)
+ : AbortRx(RX_PROTOCOL_ERROR, "RX_PROTOCOL_ERROR", "", op) {}
+};
+struct AbortRX_USER_ABORT : AbortRx {
+ AbortRX_USER_ABORT(const char *op = NULL)
+ : AbortRx(RX_USER_ABORT, "RX_USER_ABORT", "", op) {}
+};
+struct AbortRX_ADDRINUSE : AbortRx {
+ AbortRX_ADDRINUSE(const char *op = NULL)
+ : AbortRx(RX_ADDRINUSE, "RX_ADDRINUSE", "", op) {}
+};
+struct AbortRX_DEBUGI_BADTYPE : AbortRx {
+ AbortRX_DEBUGI_BADTYPE(const char *op = NULL)
+ : AbortRx(RX_DEBUGI_BADTYPE, "RX_DEBUGI_BADTYPE", "", op) {}
+};
+
+struct AbortRXGEN_CC_MARSHAL : AbortRx {
+ AbortRXGEN_CC_MARSHAL(const char *op = NULL)
+ : AbortRx(RXGEN_CC_MARSHAL, "RXGEN_CC_MARSHAL", "", op) {}
+};
+struct AbortRXGEN_CC_UNMARSHAL : AbortRx {
+ AbortRXGEN_CC_UNMARSHAL(const char *op = NULL)
+ : AbortRx(RXGEN_CC_UNMARSHAL, "RXGEN_CC_UNMARSHAL", "", op) {}
+};
+struct AbortRXGEN_SS_MARSHAL : AbortRx {
+ AbortRXGEN_SS_MARSHAL(const char *op = NULL)
+ : AbortRx(RXGEN_SS_MARSHAL, "RXGEN_SS_MARSHAL", "", op) {}
+};
+struct AbortRXGEN_SS_UNMARSHAL : AbortRx {
+ AbortRXGEN_SS_UNMARSHAL(const char *op = NULL)
+ : AbortRx(RXGEN_SS_UNMARSHAL, "RXGEN_SS_UNMARSHAL", "", op) {}
+};
+struct AbortRXGEN_DECODE : AbortRx {
+ AbortRXGEN_DECODE(const char *op = NULL)
+ : AbortRx(RXGEN_DECODE, "RXGEN_DECODE", "", op) {}
+};
+struct AbortRXGEN_OPCODE : AbortRx {
+ AbortRXGEN_OPCODE(const char *op = NULL)
+ : AbortRx(RXGEN_OPCODE, "RXGEN_OPCODE", "", op) {}
+};
+struct AbortRXGEN_SS_XDRFREE : AbortRx {
+ AbortRXGEN_SS_XDRFREE(const char *op = NULL)
+ : AbortRx(RXGEN_SS_XDRFREE, "RXGEN_SS_XDRFREE", "", op) {}
+};
+struct AbortRXGEN_CC_XDRFREE : AbortRx {
+ AbortRXGEN_CC_XDRFREE(const char *op = NULL)
+ : AbortRx(RXGEN_CC_XDRFREE, "RXGEN_CC_XDRFREE", "", op) {}
+};
+
+extern void throw_abort(int abort_code, const char *op=NULL);
+
+} /* end namespace rxrpc */
+
+#endif /* RXRPC_HH */
--- /dev/null
+/* rxrpc core
+ *
+ * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#include <iostream>
+#include <cstdio>
+#include <cstdlib>
+#include <cstring>
+#include <cerrno>
+#include "rxrpc.H"
+#include "transport.H"
+
+using rxrpc::ref;
+
+bool rxrpc::debug_buffers; /* Debug buffer handling */
+bool rxrpc::debug_core; /* Debug core management */
+bool rxrpc::debug_data; /* Dump data */
+bool rxrpc::debug_ops; /* Debug operations */
+bool rxrpc::debug_transport; /* Debug transport */
+bool rxrpc::debug_xdr; /* Debug XDR encoding/decoding */
+
+#define debug(fmt, ...) \
+ do { if (rxrpc::debug_core) printf("CORE: " fmt, ## __VA_ARGS__); } while (0)
+#define debug_op(fmt, ...) \
+ do { if (rxrpc::debug_ops) printf("OP: " fmt, ## __VA_ARGS__); } while (0)
+
+rxrpc::refcount::~refcount() {}
+rxrpc::Security::~Security() {}
+rxrpc::Transport::~Transport() {}
+
+static ref<rxrpc::Transport> rxrpc_transport;
+
+void rxrpc::find_transport(void)
+{
+ if (!rxrpc_transport)
+ rxrpc_transport = create_af_rxrpc_transport();
+}
+
+/*
+ * Set up a new transport
+ */
+ref<rxrpc::Endpoint> rxrpc::new_local_endpoint(const struct sockaddr_rxrpc *local_addr,
+ rxrpc::Security *security)
+{
+ if (!rxrpc_transport)
+ throw std::invalid_argument("No Rx transport loaded");
+
+ return rxrpc_transport->new_local_endpoint(local_addr, security);
+}
+
+/*
+ * Destroy a local endpoint.
+ */
+rxrpc::Endpoint::~Endpoint()
+{
+}
+
+/*
+ * Set up a new security context.
+ */
+ref<rxrpc::Security> rxrpc::new_security(const std::string &key_name,
+ enum rxrpc::security_auth_level level)
+{
+ if (level < security_unset ||
+ level > security_encrypt)
+ throw std::invalid_argument("Invalid security level");
+
+ return rxrpc_transport->new_security(key_name, level);
+}
+
+/*
+ * Construct a server call..
+ */
+rxrpc::Call::Call()
+{
+ memset(&peer, 0, sizeof(peer));
+ peer_len = 0;
+ service_id = 0;
+ state = call_cl_not_started;
+ completed = call_incomplete;
+ abort_code = 0;
+ error = 0;
+ client_side = false;
+ split = false;
+ exclusive = false;
+ upgrade_service = false;
+ have_rx_timestamp = false;
+ memset(&rx_timestamp, 0, sizeof(rx_timestamp));
+ completion_func = NULL;
+ throw_abort_func = rxrpc::throw_abort;
+ total_tx_size = 0;
+ encode_error = 0;
+ dec_func = NULL;
+}
+
+/*
+ * Construct a client call.
+ */
+rxrpc::Call::Call(struct Call_params *params, const char *op_name, size_t size)
+ : Rx_queue(op_name)
+{
+ endpoint = params->endpoint;
+ peer = params->peer;
+ peer_len = sizeof(peer);
+ service_id = params->peer.srx_service;;
+ state = call_cl_not_started;
+ completed = call_incomplete;
+ abort_code = 0;
+ error = 0;
+ client_side = true;
+ split = false;
+ exclusive = params->exclusive;
+ upgrade_service = params->upgrade_service;
+ have_rx_timestamp = false;
+ memset(&rx_timestamp, 0, sizeof(rx_timestamp));
+ completion_func = NULL;
+ throw_abort_func = rxrpc::throw_abort;
+ total_tx_size = size;
+ encode_error = 0;
+ dec_func = rxrpc::receive_reply;
+}
+
+/*
+ * Set up a client call and prepare for encoding.
+ */
+ref<rxrpc::Call> rxrpc::make_call(struct Call_params *params,
+ const char *op_name,
+ Enc_buffer *buf)
+{
+ ref<rxrpc::Call> call;
+
+ if (params->peer_len < 5 * 2 ||
+ params->peer_len > sizeof(call->peer))
+ throw std::invalid_argument("Peer address too small");
+
+ if (params->peer.srx_family != AF_RXRPC)
+ throw std::invalid_argument("Peer address family unsupported");
+
+ call = params->endpoint->new_client_call(params, op_name, buf->size);
+
+ debug_op("Send %s\n", op_name);
+ return call;
+}
+
+/*
+ * Call destructor
+ */
+rxrpc::Call::~Call()
+{
+}
+
+/*
+ * Reply decoder.
+ */
+void rxrpc::receive_reply(Call *call)
+{
+ if (call->state == rxrpc::Call::call_completed &&
+ call->completed == rxrpc::Call::call_success)
+ debug("Reply received\n");
+}
+
+/*
+ * End of reception of request. There should be no more data in the buffers.
+ */
+void rxrpc::Call::request_received()
+{
+ switch (state) {
+ case call_sv_receiving_params:
+ default:
+ fprintf(stderr, "rxrpc::request_received() called in state %u\n",
+ state);
+ break;
+ }
+}
--- /dev/null
+/* XDR coding and decoding.
+ *
+ * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#include <iostream>
+#include <fmt/core.h>
+#include "rxrpc.H"
+
+using rxrpc::ref;
+
+namespace rxrpc {
+
+static inline size_t calc_padding(size_t x)
+{
+ return (4 - (x & 3)) & 3;
+}
+
+static void enc_advance_iov_by_2(Enc_buffer *);
+static void dec_string_copy(void *, size_t, const void *, size_t);
+static void dec_ignore_padding(void *, size_t, const void *, size_t);
+}
+
+#define debug(fmt, ...) \
+ do { if (rxrpc::debug_xdr) printf("XDR: " fmt, ## __VA_ARGS__); } while (0)
+#define debug_buffer(fmt, ...) \
+ do { if (rxrpc::debug_buffers) printf("BUF: " fmt, ## __VA_ARGS__); } while (0)
+
+#define UAENOMEM 0x2f6df0b
+
+/*
+ * Set up an encoded buffer.
+ */
+ref<rxrpc::Enc_buffer> rxrpc::alloc_enc_buffer(Resv &resv)
+{
+ ref<Enc_buffer> buf;
+ size_t alloc;
+
+ if (resv.max_ioc == 0)
+ abort();
+
+ buf = new Enc_buffer;
+ buf->msg_iov = new iovec[resv.max_ioc];
+
+ alloc = (resv.alloc + 3) / 4;
+ buf->enc_buf = new net_xdr_t[alloc];
+
+ buf->flat = resv.flat;
+ buf->size = resv.alloc + resv.additional;
+ buf->enc_buf_p = buf->enc_buf;
+ buf->enc_buf_end = buf->enc_buf + alloc;
+ buf->msg_iov[0].iov_base = buf->enc_buf_p;
+ debug_buffer("Tx ALLOC s=%zx a=%zx ioc=%u\n", buf->size, resv.alloc, resv.max_ioc);
+ return buf;
+}
+
+rxrpc::Enc_buffer::~Enc_buffer()
+{
+ delete [] msg_iov;
+ delete [] enc_buf;
+}
+
+/*
+ * Advance the iov array by two slots, leaving a slot for a blob.
+ */
+static void rxrpc::enc_advance_iov_by_2(Enc_buffer *buf)
+{
+ unsigned int ioc = buf->ioc;
+
+ /* Note that iov_base may not be 4-bytes aligned if it starts with blob
+ * padding.
+ */
+ buf->msg_iov[ioc].iov_len =
+ (unsigned long)buf->enc_buf_p - (unsigned long)buf->msg_iov[ioc].iov_base;
+
+ ioc += 2;
+ buf->msg_iov[ioc].iov_base = buf->enc_buf_p;
+ buf->ioc = ioc;
+}
+
+/*
+ * Encode an opaque blob to XDR. We copy in the blob if it's small, otherwise
+ * we assign it its own iov slot.
+ */
+void rxrpc::enc_data(Enc_buffer *buf,
+ const void *blob, unsigned int size, unsigned int max)
+{
+ unsigned int ioc;
+ size_t padsize = calc_padding(size);
+ size_t total = size + padsize;
+
+ enc(buf, size);
+
+ if (size == 0) {
+ /* Nothing to store */
+ } else if (reserve_opaque_inline(buf->flat, size, max)) {
+ if (size & 3)
+ buf->enc_buf_p[size >> 2] = 0; /* Clear padding */
+ memcpy(buf->enc_buf_p, blob, size);
+ buf->enc_buf_p += total / sizeof(net_xdr_t);
+ } else {
+ enc_advance_iov_by_2(buf);
+ ioc = buf->ioc;
+ buf->msg_iov[ioc - 1].iov_len = size;
+ buf->msg_iov[ioc - 1].iov_base = (void *)blob;
+ if (padsize > 0) {
+ enc(buf, 0);
+ buf->msg_iov[ioc].iov_base += 4 - padsize;
+ buf->msg_iov[ioc].iov_len += padsize;
+ }
+ }
+}
+
+void rxrpc::Call::dec_underrun(size_t size)
+{
+ if (client_side)
+ throw std::runtime_error(
+ fmt::format("rxrpc XDR decoder underrun ({:x}/{:x}) on {}",
+ dec_amount, size, op_name));
+ else
+ throw AbortRXGEN_SS_UNMARSHAL(op_name);
+}
+
+rxrpc::Rx_queue::Rx_queue(const char *opname) : op_name(opname)
+{
+ dec_index = 0;
+ dec_amount = 0;
+}
+
+rxrpc::Rx_queue::~Rx_queue()
+{
+}
+
+
+void rxrpc::Rx_queue::overrun()
+{
+#if 0
+ if (state == call_completed &&
+ completed != call_success)
+ return;
+#endif
+ printf("Partial fail\n");
+}
+
+/*
+ * Default copier for rxrpc::Call::read.
+ */
+void rxrpc::Rx_queue::read_copy(void *buffer, size_t offset, const void *data, size_t len)
+{
+ memcpy(buffer + offset, data, len);
+}
+
+/*
+ * Read data from the receive buffers.
+ */
+void rxrpc::Rx_queue::read(void *buffer, size_t buffer_size,
+ void (*copy)(void *buffer, size_t offset, const void *data, size_t len))
+{
+ unsigned int seg, i;
+ size_t offset = 0;
+
+ debug("READ %zu/%zu\n", buffer_size, dec_amount);
+
+ if (buffer_size > dec_amount)
+ return overrun();
+
+ while (offset < buffer_size && !dec_buffers.empty()) {
+ Rx_buffer &b = dec_buffers.front();
+
+ seg = b.len - b.used;
+ if (seg > buffer_size - offset)
+ seg = buffer_size - offset;
+
+ debug("B[%u] %zu/%zu from %u/%zu\n",
+ b.index, offset, buffer_size, seg, dec_amount);
+
+ (*copy)(buffer, offset, b.buf + b.used, seg);
+ offset += seg;
+ b.used += seg;
+ dec_amount -= seg;
+
+ if (b.used == b.len) {
+ dec_buffers.pop_front();
+ } else {
+ if (offset != buffer_size)
+ ::abort();
+ }
+ }
+
+ if (offset < buffer_size) {
+ fprintf(stderr, "Inconsistent amount of buffered rx data\n");
+ ::abort();
+ }
+
+ if (dec_buffers.empty() && dec_amount != 0) {
+ fprintf(stderr, "dec_amount should be zero\n");
+ ::abort();
+ }
+
+ if (debug_xdr) {
+ seg = buffer_size <= 32 ? buffer_size : 32;
+ debug("");
+ for (i = 0; i < seg; i++)
+ printf("%02x", ((unsigned char *)buffer)[i]);
+ printf("\n");
+ }
+}
+
+/*
+ * Decode a 32-bit integer.
+ */
+uint32_t rxrpc::dec(Rx_queue &rxq)
+{
+ uint32_t x;
+
+ rxq.read(&x, sizeof(x));
+ return ntohl(x);
+}
+
+/*
+ * Decode a 32-bit integer and check limit.
+ */
+uint32_t rxrpc::dec(Rx_queue &rxq, size_t limit)
+{
+ uint32_t x;
+
+ x = dec(rxq);
+ if (x > limit) {
+ if (rxq.client_side) {
+ throw std::runtime_error(
+ fmt::format("XDR: Decoded value exceeded limit ({:x}/{:x}) on {}",
+ x, limit, rxq.op_name));
+ } else {
+ throw AbortRXGEN_SS_UNMARSHAL(rxq.op_name);
+ }
+ }
+
+ return x;
+}
+
+static void rxrpc::dec_string_copy(void *buffer, size_t offset, const void *data, size_t len)
+{
+ std::string *s = (std::string *)buffer;
+
+ s->insert(offset, (const char *)data, len);
+}
+
+static void rxrpc::dec_ignore_padding(void *buffer, size_t offset, const void *data, size_t len)
+{
+}
+
+/*
+ * Decode a string.
+ */
+void rxrpc::dec_string(Rx_queue &rxq, std::string &s, size_t max)
+{
+ size_t len;
+
+ len = dec(rxq, max);
+ s.reserve(len);
+ rxq.read(&s, len, dec_string_copy);
+ len &= 3;
+ if (len)
+ rxq.read(NULL, 4 - len, dec_ignore_padding);
+}
+
+/*
+ * Decode an opaque
+ */
+void rxrpc::dec_opaque(Rx_queue &rxq, Opaque &data, size_t max)
+{
+ size_t len;
+
+ len = dec(rxq, max);
+ data.reserve(len);
+ rxq.read(data.buffer, len);
+ data.buffer_size = len;
+ len &= 3;
+ if (len)
+ rxq.read(NULL, 4 - len, dec_ignore_padding);
+}
+
+/*
+ * Construct an abort exception class.
+ */
+rxrpc::rx_abort::rx_abort(int abort, const char *name, const char *desc, const char *op)
+ : std::runtime_error(fmt::format("RPC {} aborted with {}", op, name)),
+ abort_code(abort), abort_name(name), op_name(op)
+{
+ if (debug_ops)
+ printf("OP: Rx ABORT: %d %s\n", abort_code, abort_name);
+}
+
+/*
+ * Throw an exception appropriate to the abort code.
+ */
+void rxrpc::throw_abort(int abort_code, const char *op)
+{
+ switch (abort_code) {
+ case RX_CALL_DEAD: throw AbortRX_CALL_DEAD(op);
+ case RX_INVALID_OPERATION: throw AbortRX_INVALID_OPERATION(op);
+ case RX_CALL_TIMEOUT: throw AbortRX_CALL_TIMEOUT(op);
+ case RX_EOF: throw AbortRX_EOF(op);
+ case RX_PROTOCOL_ERROR: throw AbortRX_PROTOCOL_ERROR(op);
+ case RX_USER_ABORT: throw AbortRX_USER_ABORT(op);
+ case RX_ADDRINUSE: throw AbortRX_ADDRINUSE(op);
+ case RX_DEBUGI_BADTYPE: throw AbortRX_DEBUGI_BADTYPE(op);
+ case RXGEN_CC_MARSHAL: throw AbortRXGEN_CC_MARSHAL(op);
+ case RXGEN_CC_UNMARSHAL: throw AbortRXGEN_CC_UNMARSHAL(op);
+ case RXGEN_SS_MARSHAL: throw AbortRXGEN_SS_MARSHAL(op);
+ case RXGEN_SS_UNMARSHAL: throw AbortRXGEN_SS_UNMARSHAL(op);
+ case RXGEN_DECODE: throw AbortRXGEN_DECODE(op);
+ case RXGEN_OPCODE: throw AbortRXGEN_OPCODE(op);
+ case RXGEN_SS_XDRFREE: throw AbortRXGEN_SS_XDRFREE(op);
+ case RXGEN_CC_XDRFREE: throw AbortRXGEN_CC_XDRFREE(op);
+ default:
+ throw unknown_abort(abort_code, op);
+ }
+}
--- /dev/null
+/* Access points into the dynamically loaded Rx transport module
+ *
+ * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#ifndef _RXRPC_TRANSPORT_H
+#define _RXRPC_TRANSPORT_H
+
+#include <stdint.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include "rxrpc.H"
+
+namespace rxrpc {
+
+enum recv_condition {
+ recv_more,
+ recv_final_ack,
+ recv_complete,
+ recv_complete_now_send,
+ recv_abort,
+ recv_net_error,
+ recv_local_error,
+ recv_busy,
+};
+
+#if 0
+typedef void (*incoming_call_func_t)(struct service *service,
+ void *caller_data,
+ const struct sockaddr_rxrpc *peer,
+ socklen_t *peerlen);
+#endif
+
+class Transport : public refcount {
+public:
+ virtual std::string name() const = 0;
+
+ virtual ref<Endpoint> new_local_endpoint(const struct sockaddr_rxrpc *local_addr,
+ Security *security) = 0;
+
+ virtual ref<Security> new_security(const std::string &key_name,
+ enum rxrpc::security_auth_level level) = 0;
+
+#if 0
+ virtual int new_service(Endpoint *&endpoint,
+ Service *service,
+ incoming_call_func_t new_call_func,
+ void *caller_data,
+ uint16_t local_port,
+ uint16_t local_service,
+ Security *permits) = 0;
+
+ virtual void shutdown_service(Service *service) = 0;
+
+ virtual int accept_call(Call *call,
+ Service *service,
+ void *caller_data,
+ struct sockaddr_rxrpc *_sa,
+ socklen_t *_salen) = 0;
+#endif
+
+ virtual ~Transport();
+};
+
+} /* end namespace rxrpc */
+
+extern rxrpc::ref<rxrpc::Transport> create_af_rxrpc_transport();
+
+#endif /* _RXRPC_TRANSPORT_H */