From: David Howells Date: Mon, 30 Sep 2019 09:13:00 +0000 (+0100) Subject: Implement basic I/O on top of an AF_RXRPC socket X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=b192430aeced39849fa7666dd42d8172a89628c2;p=users%2Fdhowells%2Fkafs-utils.git Implement basic I/O on top of an AF_RXRPC socket Implement basic I/O on top of an AF_RXRPC socket so that the codecs generated by rxgen.py will work. Signed-off-by: David Howells --- diff --git a/.gitignore b/.gitignore index b25c15b..bda4893 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,5 @@ *~ +*.o +.*.o.d +*.so +lib/libkafs_utils.* diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..38fda8f --- /dev/null +++ b/Makefile @@ -0,0 +1,7 @@ +all: + $(MAKE) -C lib + +clean: + $(RM) *~ + $(MAKE) -C lib clean + $(MAKE) -C rxgen clean diff --git a/lib/Makefile b/lib/Makefile new file mode 100644 index 0000000..6e81c71 --- /dev/null +++ b/lib/Makefile @@ -0,0 +1,20 @@ +CFLAGS := -g -Wall -Wformat -fpic -O2 -Wno-pointer-arith + +LIB_SRCS := rxrpc_xdr.C af_rxrpc.C rxrpc_core.C +LIB_OBJS := $(patsubst %.C,%.o,$(LIB_SRCS)) + +%.o: %.C + $(CXX) $(CPPFLAGS) -MMD -MF .$@.d $(CFLAGS) -o $@ -c $< + +all: libkafs_utils.a + +libkafs_utils.a: $(LIB_OBJS) + $(AR) rcs -o $@ $(LIB_OBJS) + +DEPS := $(wildcard .*.o.d) +ifneq ($(DEPS),) +include $(DEPS) +endif + +clean: + $(RM) *~ *.o *.so *.a $(DEPS) diff --git a/lib/af_rxrpc.C b/lib/af_rxrpc.C new file mode 100644 index 0000000..7079c68 --- /dev/null +++ b/lib/af_rxrpc.C @@ -0,0 +1,985 @@ +/* AF_RXRPC driver + * + * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public Licence + * as published by the Free Software Foundation; either version + * 2 of the Licence, or (at your option) any later version. + */ + +#define _XOPEN_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "transport.H" + +using rxrpc::ref; +class af_rxrpc_call; + +#define debug(fmt, ...) \ + do { if (rxrpc::debug_transport) printf("AFRX: " fmt, ## __VA_ARGS__); } while (0) +#define debug_buffer(fmt, ...) \ + do { if (rxrpc::debug_buffers) printf("BUF: " fmt, ## __VA_ARGS__); } while (0) + +#define RXRPC_SELECT_CALL_FOR_RECV 7 /* Specify the call for recvmsg, SIOCINQ, etc. */ +#define RXRPC_SELECT_CALL_FOR_SEND 8 /* Specify the call for splice, SIOCOUTQ, etc. */ + +#define RXRPC_SET_SECURITY_KEY 14 +#define RXRPC_SET_SECURITY_LEVEL 15 + +/* + * Add a call ID to a control message sequence. + */ +#define RXRPC_ADD_CALLID(control, ctrllen, id) \ +do { \ + unsigned char *__buffer = (control); \ + unsigned long *__data; \ + struct cmsghdr *__cmsg; \ + __cmsg = (struct cmsghdr *)(__buffer + (ctrllen)); \ + __cmsg->cmsg_len = CMSG_LEN(sizeof(*__data)); \ + __cmsg->cmsg_level = SOL_RXRPC; \ + __cmsg->cmsg_type = RXRPC_USER_CALL_ID; \ + __data = (unsigned long *)CMSG_DATA(__cmsg); \ + *__data = (id); \ + (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len); \ +} while (0) + +/* + * Add an abort instruction to a control message sequence. + */ +#define RXRPC_ADD_ABORT(control, ctrllen, abort_code) \ +do { \ + void *__buffer = (control); \ + unsigned int *__data; \ + struct cmsghdr *__cmsg; \ + __cmsg = (struct cmsghdr *)(__buffer + (ctrllen)); \ + __cmsg->cmsg_len = CMSG_LEN(sizeof(*__data)); \ + __cmsg->cmsg_level = SOL_RXRPC; \ + __cmsg->cmsg_type = RXRPC_ABORT; \ + __data = (unsigned int *)CMSG_DATA(__cmsg); \ + *__data = (abort_code); \ + (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len); \ +} while (0) + +/* + * Add a security level instruction to a control message sequence. + */ +#define RXRPC_ADD_SECURITY_LEVEL(control, ctrllen, sec_level) \ +do { \ + void *__buffer = (control); \ + unsigned int *__data; \ + struct cmsghdr *__cmsg; \ + __cmsg = (struct cmsghdr *)(__buffer + (ctrllen)); \ + __cmsg->cmsg_len = CMSG_LEN(sizeof(*__data)); \ + __cmsg->cmsg_level = SOL_RXRPC; \ + __cmsg->cmsg_type = RXRPC_SET_SECURITY_LEVEL; \ + __data = (void *)CMSG_DATA(__cmsg); \ + *__data = (sec_level); \ + (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len); \ +} while (0) + +/* + * Add a total transmit length instruction to a control message sequence. + */ +#define RXRPC_ADD_TX_LENGTH(control, ctrllen, length) \ +do { \ + void *__buffer = (control); \ + unsigned long long __data = (length); \ + struct cmsghdr *__cmsg; \ + __cmsg = (struct cmsghdr *)(__buffer + (ctrllen)); \ + __cmsg->cmsg_len = CMSG_LEN(sizeof(__data)); \ + __cmsg->cmsg_level = SOL_RXRPC; \ + __cmsg->cmsg_type = RXRPC_TX_LENGTH; \ + memcpy(CMSG_DATA(__cmsg), &__data, sizeof(__data)); \ + (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len); \ +} while (0) + +/* + * Add a miscellaneous option to a control message sequence. + */ +#define RXRPC_ADD_MISC_OPTION(control, ctrllen, opt) \ +do { \ + void *__buffer = (control); \ + struct cmsghdr *__cmsg; \ + __cmsg = (struct cmsghdr *)(__buffer + (ctrllen)); \ + __cmsg->cmsg_len = CMSG_LEN(0); \ + __cmsg->cmsg_level = SOL_RXRPC; \ + __cmsg->cmsg_type = opt; \ + (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len); \ +} while (0) + +/* + * RxRPC tranport definition. + */ +class af_rxrpc : public rxrpc::Transport { +public: + std::string name() const; + + ref new_local_endpoint(const struct sockaddr_rxrpc *local_addr, + rxrpc::Security *security); + + ref new_security(const std::string &key_name, + enum rxrpc::security_auth_level level); + + ~af_rxrpc(); +}; + +/* + * RxRPC local endpoint representation. + */ +class af_rxrpc_endpoint : public rxrpc::Endpoint { +public: + struct sockaddr_rxrpc local; + int fd; + + /* Message reception infrastructure */ + std::list rx_spare_buffers; /* Spare bufferage for receive */ + unsigned int rx_nr_buffers; /* Number of buffers on the list */ +#define CONN_MSGS 3 +#define CONN_CMSG_SIZE 128 +#define CONN_NR_IOV 4 + struct mmsghdr rx_msgs[CONN_MSGS]; /* Message headers */ + struct sockaddr_rxrpc rx_peer[CONN_MSGS]; /* Mass peer address bufferage */ + struct iovec rx_iov[CONN_MSGS * CONN_NR_IOV]; /* Mass data bufferage */ + unsigned char rx_cmsg[CONN_MSGS * CONN_CMSG_SIZE]; /* Mass control message bufferage */ + + af_rxrpc_endpoint(const struct sockaddr_rxrpc *local_addr, + rxrpc::Security *security); + ~af_rxrpc_endpoint() + { + int i; + if (fd != -1) + close(fd); + + for (i = 0; i < CONN_MSGS * CONN_NR_IOV; i++) + delete [] (unsigned char *)rx_iov[i].iov_base; + } + + ref new_client_call(struct rxrpc::Call_params *params, + const char *op_name, + size_t size); + void receive(bool nowait); + + ref get_call_from_anciliary(struct msghdr *msg); + void recv_msg(struct msghdr *msg, unsigned int len); +}; + +/* + * AF_RXRPC security descriptor. + */ +class af_rxrpc_security : public rxrpc::Security { +public: + unsigned int min_sec_level; /* RXRPC_SECURITY_* value */ + af_rxrpc_security(const std::string &key_name, + enum rxrpc::security_auth_level level) + : Security(key_name, level) + { + min_sec_level = RXRPC_SECURITY_PLAIN; + } +}; + +/* + * AF_RXRPC call representation. + */ +class af_rxrpc_call : public rxrpc::Call { +public: + bool known_to_kernel; + + af_rxrpc_call(rxrpc::Call_params *params, const char *op_name, size_t size); + ~af_rxrpc_call(); + + void end(int32_t abort_code); + void abort(int32_t abort_code); + void send_data(rxrpc::Enc_buffer *buf, bool more); + void end_split(void); + + void process_anciliary(struct msghdr *msg); + void recv_msg(struct msghdr *msg, unsigned int len); +}; + +/* + * dump the control messages + */ +static __attribute__((unused)) +void dump_cmsg(const char *dir, struct msghdr *msg) +{ + struct timespec tv; + struct cmsghdr *cmsg; + unsigned long long tx_len; + unsigned long user_id; + unsigned char *p; + int abort_code; + int n; + + for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { + n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg)); + p = CMSG_DATA(cmsg); + + debug("%s CMSG: %zu: ", dir, cmsg->cmsg_len); + + switch (cmsg->cmsg_level) { + case SOL_SOCKET: + switch (cmsg->cmsg_type) { + case SO_TIMESTAMPNS_NEW: + printf("SO_TIMESTAMPNS_NEW: "); + if (n != sizeof(tv)) + goto dump_data; + memcpy(&tv, p, sizeof(tv)); + printf("%lu.%lu\n", tv.tv_sec, tv.tv_nsec); + continue; + default: + printf("SOCKET_%d", cmsg->cmsg_type); + break; + } + break; + + case SOL_RXRPC: + switch (cmsg->cmsg_type) { + case RXRPC_USER_CALL_ID: + printf("RXRPC_USER_CALL_ID: "); + if (n != sizeof(user_id)) + goto dump_data; + memcpy(&user_id, p, sizeof(user_id)); + printf("%lx\n", user_id); + continue; + + case RXRPC_ABORT: + printf("RXRPC_ABORT: "); + if (n != sizeof(abort_code)) + goto dump_data; + memcpy(&abort_code, p, sizeof(abort_code)); + printf("%d\n", abort_code); + continue; + + case RXRPC_ACK: + printf("RXRPC_ACK"); + if (n != 0) + goto dump_data_colon; + goto print_nl; + + case RXRPC_NET_ERROR: + printf("RXRPC_NET_ERROR: "); + if (n != sizeof(abort_code)) + goto dump_data; + memcpy(&abort_code, p, sizeof(abort_code)); + printf("%s\n", strerror(abort_code)); + continue; + + case RXRPC_BUSY: + printf("RXRPC_BUSY"); + if (n != 0) + goto dump_data_colon; + goto print_nl; + + case RXRPC_LOCAL_ERROR: + printf("RXRPC_LOCAL_ERROR: "); + if (n != sizeof(abort_code)) + goto dump_data; + memcpy(&abort_code, p, sizeof(abort_code)); + printf("%s\n", strerror(abort_code)); + continue; + + case RXRPC_UPGRADE_SERVICE: + printf("RXRPC_UPGRADE_SERVICE\n"); + continue; + case RXRPC_TX_LENGTH: + printf("RXRPC_TX_LENGTH: "); + if (n != sizeof(tx_len)) + goto dump_data; + memcpy(&tx_len, p, sizeof(tx_len)); + printf("%llu\n", tx_len); + continue; + case RXRPC_SET_CALL_TIMEOUT: + printf("RXRPC_SET_CALL_TIMEOUT\n"); + continue; + default: + printf("RXRPC_%d", cmsg->cmsg_type); + break; + } + break; + + default: + printf("l=%d t=%d", cmsg->cmsg_level, cmsg->cmsg_type); + break; + } + + + dump_data_colon: + printf(": "); + dump_data: + printf("{"); + for (; n > 0; n--, p++) + printf("%02x", *p); + + print_nl: + printf("}\n"); + } +} + +std::string af_rxrpc::name() const +{ + return "AF_RXRPC"; +} + +/* + * Construct a local endpoint. + */ +af_rxrpc_endpoint::af_rxrpc_endpoint(const struct sockaddr_rxrpc *local_addr, + rxrpc::Security *security) + : Endpoint(local_addr) +{ + int opt, i; + + fd = -1; + rx_nr_buffers = 0; + memset(rx_msgs, 0, sizeof(rx_msgs)); + memset(rx_peer, 0, sizeof(rx_peer)); + memset(rx_iov, 0, sizeof(rx_iov)); + memset(rx_cmsg, 0, sizeof(rx_cmsg)); + + /* Set up the message descriptors */ + for (i = 0; i < CONN_MSGS * CONN_NR_IOV; i++) + rx_iov[i].iov_len = RXGEN_BUFFER_SIZE; + + for (i = 0; i < CONN_MSGS; i++) { + struct mmsghdr *m = &rx_msgs[i]; + + m->msg_hdr.msg_iov = rx_iov + i * CONN_NR_IOV; + m->msg_hdr.msg_iovlen = CONN_NR_IOV; + m->msg_hdr.msg_name = &rx_peer[i]; + m->msg_hdr.msg_namelen = sizeof(rx_peer[i]); + m->msg_hdr.msg_control = rx_cmsg; + m->msg_hdr.msg_controllen = CONN_CMSG_SIZE; + m->msg_hdr.msg_flags = 0; + } + + /* Open up a socket for talking to the AF_RXRPC module */ + fd = socket(AF_RXRPC, SOCK_DGRAM, PF_INET6); + if (fd < 0) + throw rxrpc::syserror("socket()"); + + try { + opt = 1; + if (setsockopt(fd, SOL_SOCKET, SO_TIMESTAMPNS_NEW, (char *)&opt, sizeof(opt)) == -1) + throw rxrpc::syserror("setsockopt(SO_TIMESTAMPNS_NEW)"); + + if (address.srx_family && + bind(fd, (struct sockaddr *)&address, sizeof(address)) == -1) + throw rxrpc::syserror("bind"); + + if (security) { + af_rxrpc_security *sec = dynamic_cast(security); + if (sec->level != rxrpc::security_no_auth) { + debug("OPT RXRPC_SECURITY_KEY %s\n", security->key_name.c_str()); + if (setsockopt(fd, SOL_RXRPC, RXRPC_SECURITY_KEY, + security->key_name.c_str(), + security->key_name.size()) < 0) + throw rxrpc::syserror("setsockopt(RXRPC_SECURITY_KEY)"); + + debug("OPT RXRPC_MIN_SECURITY_LEVEL %u\n", sec->min_sec_level); + if (setsockopt(fd, SOL_RXRPC, RXRPC_MIN_SECURITY_LEVEL, + &sec->min_sec_level, + sizeof(sec->min_sec_level)) < 0) + throw rxrpc::syserror("setsockopt(RXRPC_MIN_SECURITY_LEVEL)"); + } + } + } catch (...) { + close(fd); + throw; + } +} + +/* + * Initialise the transport module + */ +ref af_rxrpc::new_local_endpoint(const struct sockaddr_rxrpc *local_addr, + rxrpc::Security *security) +{ + ref endpoint; + + if (local_addr && + local_addr->transport_len) { + if (local_addr->transport_len > sizeof(local_addr->transport)) + throw std::invalid_argument("Transport address too long"); + + switch (endpoint->address.transport.family) { + case AF_INET: + case AF_INET6: + break; + default: + throw std::invalid_argument("Unsupported transport type\n"); + } + } + + return new af_rxrpc_endpoint(local_addr, security); +} + +/* + * Set up a security descriptor. Note that the key name can be NULL (for + * instance if -noauth was passed). + */ +ref af_rxrpc::new_security(const std::string &key_name, + enum rxrpc::security_auth_level level) +{ + ref security; + + security = new af_rxrpc_security(key_name, level); + + switch (level) { + case rxrpc::security_no_auth: + break; + case rxrpc::security_local_auth: + throw std::invalid_argument("Local auth not supported by AF_RXRPC transport"); + case rxrpc::security_unset: + case rxrpc::security_clear: + security->min_sec_level = RXRPC_SECURITY_PLAIN; + if (security->key_name.size() == 0) + throw std::invalid_argument("Unspecified key name"); + break; + case rxrpc::security_integrity_only: + security->min_sec_level = RXRPC_SECURITY_AUTH; + if (security->key_name.size() == 0) + throw std::invalid_argument("Unspecified key name"); + break; + case rxrpc::security_encrypt: + security->min_sec_level = RXRPC_SECURITY_ENCRYPT; + if (security->key_name.size() == 0) + throw std::invalid_argument("Unspecified key name"); + break; + } + + return security; +} + +/* + * Construct a client call. + */ +af_rxrpc_call::af_rxrpc_call(rxrpc::Call_params *params, const char *op_name, size_t size) + : Call(params, op_name, size) +{ + known_to_kernel = false; +} + +/* + * Allocate a call handle for a new call. + */ +ref af_rxrpc_endpoint::new_client_call(rxrpc::Call_params *params, + const char *op_name, + size_t size) +{ + return new af_rxrpc_call(params, op_name, size); +} + +/* + * Dump data from an output buffer. + */ +static void dump_enc_buffers(rxrpc::Enc_buffer *&buf) +{ + unsigned int pos = 0, ioc, i; + bool nl = false; + + for (ioc = 0; ioc < buf->ioc; ioc++) { + for (i = 0; i < buf->msg_iov[ioc].iov_len; i++) { + if (!(pos & 3)) { + if (!(pos & 31)) + printf("%06x: ", pos); + else + printf(" "); + } + + printf("%02x", ((unsigned char *)buf->msg_iov[ioc].iov_base)[i]); + pos++; + nl = false; + if ((pos & 31) == 0) { + printf("\n"); + nl = true; + } + } + } + + if (!nl) + printf("\n"); +} + +/* + * Send buffered data. + */ +void af_rxrpc_call::send_data(rxrpc::Enc_buffer *buf, bool more) +{ + af_rxrpc_endpoint *endpoint = this->endpoint.cast(); + struct msghdr msg; + unsigned char control[128]; + unsigned int flags = 0, tmp, i; + socklen_t olen; + ssize_t slen; + size_t ctrllen; + char abuf[512]; + + if (rxrpc::debug_transport) + printf("\n"); + + /* Switch into encode state */ + switch (state) { + case call_cl_not_started: + case call_sv_processing: + state = (enum call_state)(state + 1); + break; + case call_cl_encoding_params: + case call_sv_encoding_response: + break; + default: + fprintf(stderr, "RxRPC: Send in bad call state (%d)\n", state); + ::abort(); + } + + /* Request an operation */ + ctrllen = 0; + RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)(rxrpc::refcount *)this); + if (exclusive) + RXRPC_ADD_MISC_OPTION(control, ctrllen, RXRPC_EXCLUSIVE_CALL); + if (upgrade_service) + RXRPC_ADD_MISC_OPTION(control, ctrllen, RXRPC_UPGRADE_SERVICE); + if (total_tx_size) { + debug("total Tx size %llu\n", total_tx_size); + RXRPC_ADD_TX_LENGTH(control, ctrllen, total_tx_size); + } + + msg.msg_name = &peer; + msg.msg_namelen = peer_len; + msg.msg_iov = buf ? buf->msg_iov : NULL; + msg.msg_iovlen = buf ? buf->ioc : 0; + msg.msg_control = control; + msg.msg_controllen = ctrllen; + msg.msg_flags = 0; + + if (more) + flags |= MSG_MORE; + + switch (peer.transport.family) { + case AF_INET: + inet_ntop(peer.transport.family, &peer.transport.sin.sin_addr, + abuf, sizeof(abuf)); + debug("Tx %u Addr %s:%u\n", + peer.srx_service, abuf, ntohs(peer.transport.sin.sin_port)); + break; + case AF_INET6: + inet_ntop(peer.transport.family, &peer.transport.sin6.sin6_addr, + abuf, sizeof(abuf)); + debug("Tx %u Addr [%s]:%u\n", + peer.srx_service, abuf, ntohs(peer.transport.sin6.sin6_port)); + break; + } + + if (rxrpc::debug_transport) { + dump_cmsg("Tx", &msg); + for (i = 0; i < msg.msg_iovlen; i++) + debug("Tx IOV[%02u] %04zu %p\n", + i, msg.msg_iov[i].iov_len, msg.msg_iov[i].iov_base); + } + if (rxrpc::debug_data) + dump_enc_buffers(buf); + + olen = sizeof(tmp); + getsockopt(endpoint->fd, SOL_SOCKET, SO_ERROR, (char *)&tmp, &olen); + + /* Send the data */ + slen = sendmsg(endpoint->fd, &msg, flags); + if (slen < 0) { + debug("SENDMSG: %m\n"); + throw rxrpc::syserror("sendmsg"); + } + + debug("SENDMSG: %zd%s\n", slen, more ? " [more]" : ""); + known_to_kernel = true; + + if (!more) { + state = (enum call_state)(state + 1); + if (state == call_cl_encoding_params || + state == call_sv_encoding_response) + state = (enum call_state)(state + 1); + } +} + +/* + * End a split phase. + */ +void af_rxrpc_call::end_split(void) +{ + switch (state) { + case call_cl_encoding_params: { + send_data(NULL, false); + break; + } + + case call_cl_receiving_response: + dec_func = rxrpc::receive_reply; + dec_func(this); + break; + + default: + fprintf(stderr, "rxrpc::end_split() called in state %u\n", + state); + return; + } +} + +/* + * Determine the call a received message applies to by examining the anciliary + * data from recvmsg(). + */ +ref af_rxrpc_endpoint::get_call_from_anciliary(struct msghdr *msg) +{ + ref call; + struct cmsghdr *cmsg; + unsigned long tmp; + + if (rxrpc::debug_transport) + dump_cmsg("Rx", msg); + + for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { + unsigned char *p = CMSG_DATA(cmsg); + int n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg)); + + if (cmsg->cmsg_level != SOL_RXRPC) + continue; + + switch (cmsg->cmsg_type) { + case RXRPC_USER_CALL_ID: + if (n != sizeof(tmp)) { + fprintf(stderr, "User call ID wrong size\n"); + abort(); + } + + memcpy(&tmp, p, sizeof(tmp)); + call.assign_cast((rxrpc::refcount *)tmp); + return call; + } + } + + throw std::runtime_error("recvmsg() didn't include a RXRPC_USER_CALL_ID cmsg"); +} + +/* + * Process the metadata given to us by recvmsg(). + */ +void af_rxrpc_call::process_anciliary(struct msghdr *msg) +{ + struct sockaddr_rxrpc *srx = (struct sockaddr_rxrpc *)msg->msg_name; + struct cmsghdr *cmsg; + + if (service_id != srx->srx_service) + debug("Upgraded service to %u\n", srx->srx_service); + service_id = srx->srx_service; /* Changes if service upgrade */ + + if ((msg->msg_flags & MSG_EOR) && state != call_completed) { + debug("Rx EOR\n"); + known_to_kernel = false; + state = call_completed; + completed = call_success; + } + + for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { + unsigned char *p = CMSG_DATA(cmsg); + int n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg)); + + if (cmsg->cmsg_level == SOL_SOCKET) { + switch (cmsg->cmsg_type) { + case SO_TIMESTAMPNS_NEW: + if (have_rx_timestamp) + continue; + + if (n != sizeof(rx_timestamp)) { + fprintf(stderr, "SO_TIMESTAMPNS wrong size\n"); + std::abort(); + } + + memcpy(&rx_timestamp, p, n); + have_rx_timestamp = true; + debug("Rx tstamp\n"); + break; + } + } + + if (cmsg->cmsg_level != SOL_RXRPC) + continue; + + switch (cmsg->cmsg_type) { + case RXRPC_USER_CALL_ID: + break; + + case RXRPC_ABORT: + if (n == 4) + memcpy(&abort_code, p, 4); + error = ECONNABORTED; + state = call_completed; + completed = call_remotely_aborted; + break; + + case RXRPC_NET_ERROR: + if (n != sizeof(error)) + throw std::runtime_error("Can't parse RXRPC_NET_ERROR cmsg"); + memcpy(&error, p, 4); + state = call_completed; + completed = call_network_error; + break; + + case RXRPC_LOCAL_ERROR: + if (n != sizeof(error)) + throw std::runtime_error("Can't parse RXRPC_LOCAL_ERROR cmsg"); + memcpy(&error, p, 4); + state = call_completed; + completed = call_local_error; + break; + + case RXRPC_BUSY: + error = ECONNREFUSED; + state = call_completed; + completed = call_rejected_busy; + break; + + case RXRPC_ACK: + if (state != call_sv_waiting_for_final_ack) { + fprintf(stderr, "RxRPC: Recv-Ack in bad call state (%d)\n", + state); + std::abort(); + } + state = call_completed; + completed = call_success; + break; + + default: + fprintf(stderr, "Unexpected CMSG type %x\n", cmsg->cmsg_type); + break; + } + } + + if (state == call_completed) { + if (completion_func) { + completion_func(this); + completion_func = NULL; + } else { + switch (completed) { + case call_remotely_aborted: + throw_abort_func(abort_code, op_name); + case call_network_error: + throw rxrpc::network_error(error); + case call_local_error: + case call_rejected_busy: + throw rxrpc::syserror(error); + default: + break; + } + } + } + + debug("<--af_rxrpc_call::process_anciliary()\n"); +} + +/* + * Process a message from a call. + */ +void af_rxrpc_call::recv_msg(struct msghdr *msg, unsigned int len) +{ + af_rxrpc_endpoint *endpoint = this->endpoint.cast(); + struct iovec *iov = msg->msg_iov; + unsigned int tmp; + int i; + + process_anciliary(msg); + + if (len > CONN_NR_IOV * RXGEN_BUFFER_SIZE) { + fprintf(stderr, "Too much data returned\n"); + ::abort(); + } + + if (rxrpc::debug_data) { + tmp = len; + for (i = 0; i < CONN_NR_IOV && tmp > 0; i++) { + unsigned int seg = (tmp <= iov[i].iov_len) ? tmp : iov[i].iov_len; + bool nl = false; + printf("Rx IOV[%u] %zu %u %p\n", i, iov[i].iov_len, seg, iov[i].iov_base); + + unsigned int j, s = seg; // <= 32 ? seg : 32; + for (j = 0; j < s; j++) { + if (!(j & 3)) { + if (!(j & 31)) + printf("%06x: ", j); + else + printf(" "); + } + printf("%02x", ((unsigned char *)iov[i].iov_base)[j]); + nl = false; + if ((j & 31) == 31) { + printf("\n"); + nl = true; + } + } + if (!nl) + printf("\n"); + tmp -= seg; + } + } + + /* Update the buffers with the amount of data stored. */ + for (i = 0; i < CONN_NR_IOV && len > 0; i++) { + rxrpc::Rx_buffer &b = endpoint->rx_spare_buffers.front(); + unsigned int n; + + b.index = dec_index++; + + n = RXGEN_BUFFER_SIZE; + if (n > len) + n = len; + debug("BUF[%u] +%-4u %zu {%u}\n", + b.index, n, dec_amount + n, endpoint->rx_nr_buffers); + b.len = n; + b.buf = (unsigned char *)iov[i].iov_base; + iov[i].iov_base = NULL; + + endpoint->rx_nr_buffers--; + std::list::iterator to = endpoint->rx_spare_buffers.begin(); + to++; + dec_buffers.splice(dec_buffers.end(), + endpoint->rx_spare_buffers, + endpoint->rx_spare_buffers.begin(), + to); + dec_amount += n; + len -= n; + } + + dec_func(this); +} + +/* + * Process a message. + */ +void af_rxrpc_endpoint::recv_msg(struct msghdr *msg, unsigned int len) +{ + af_rxrpc_call *call = get_call_from_anciliary(msg); + + call->recv_msg(msg, len); +} + +/* + * Receive data from a variety of calls in one go. + */ +void af_rxrpc_endpoint::receive(bool nowait) +{ + struct iovec *iov = rx_iov; + int count, i, j; + + if (rxrpc::debug_transport) + printf("\n"); + + /* Tamp down the buffers to make sure we cycle through the memory */ + j = 0; + for (i = 0; i < CONN_MSGS * CONN_NR_IOV; i++) { + if (!iov[i].iov_base) + continue; + iov[j++].iov_base = iov[i].iov_base; + } + + for (; j < CONN_MSGS * CONN_NR_IOV; j++) + iov[j].iov_base = new unsigned char[RXGEN_BUFFER_SIZE]; + + /* Make sure we have sufficient buffer descriptors */ + while (rx_nr_buffers < CONN_MSGS * CONN_NR_IOV) { + rx_spare_buffers.emplace_back(); + rx_nr_buffers++; + } + + /* Receive messages. The descriptor array is already set up and gets + * reset as messages are processed. + */ + count = recvmmsg(fd, rx_msgs, CONN_MSGS, + nowait ? MSG_DONTWAIT : MSG_WAITFORONE, + NULL); + debug("RECVMMSG: %d\n", count); + if (count == -1) { + if (errno != ENODATA) + throw rxrpc::syserror("recvmmsg"); + return; + } + + for (i = 0; i < count; i++) { + struct mmsghdr *m = &rx_msgs[i]; + struct msghdr *msg = &m->msg_hdr; + + debug("Rx DATA: %d [fl:%x]\n", m->msg_len, msg->msg_flags); + + recv_msg(msg, m->msg_len); + + /* Reset the message descriptor. */ + m->msg_hdr.msg_namelen = sizeof(rx_peer[i]); + m->msg_hdr.msg_controllen = CONN_CMSG_SIZE; + m->msg_hdr.msg_flags = 0; + } +} + +/* + * Abort a call. + */ +void af_rxrpc_call::abort(int32_t abort_code) +{ + af_rxrpc_endpoint *endpoint = this->endpoint.cast(); + struct msghdr msg; + size_t ctrllen; + unsigned char control[128]; + + if (known_to_kernel) { + memset(control, 0, sizeof(control)); + ctrllen = 0; + RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)(rxrpc::refcount *)this); + RXRPC_ADD_ABORT(control, ctrllen, abort_code); + + msg.msg_name = &peer; + msg.msg_namelen = peer_len; + msg.msg_iov = NULL; + msg.msg_iovlen = 0; + msg.msg_control = control; + msg.msg_controllen = ctrllen; + msg.msg_flags = 0; + + sendmsg(endpoint->fd, &msg, 0); + state = call_completed; + completed = call_locally_aborted; + } +} + +/* + * End a call, aborting it if necessary. + */ +void af_rxrpc_call::end(int32_t abort_code) +{ + if (state != call_completed) + abort(abort_code); +} + +/* + * Destroy a call. + */ +af_rxrpc_call::~af_rxrpc_call() +{ + if (known_to_kernel) { + fprintf(stderr, "Call still known to kernel\n"); + end(RX_USER_ABORT); + } +} + +af_rxrpc::~af_rxrpc() +{ +} + +ref create_af_rxrpc_transport() +{ + return new af_rxrpc(); +} diff --git a/lib/rxrpc.H b/lib/rxrpc.H new file mode 100644 index 0000000..4fc2830 --- /dev/null +++ b/lib/rxrpc.H @@ -0,0 +1,594 @@ +/* Core definitions. + * + * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public Licence + * as published by the Free Software Foundation; either version + * 2 of the Licence, or (at your option) any later version. + */ + +#ifndef RXRPC_HH +#define RXRPC_HH + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "pointer.H" + +namespace rxrpc { + +class Call; + +extern bool debug_buffers; +extern bool debug_core; +extern bool debug_data; +extern bool debug_ops; +extern bool debug_transport; +extern bool debug_xdr; + +typedef unsigned int net_xdr_t; + +struct xdr_u64 { net_xdr_t hi, lo; }; + +enum security_auth_level { + security_unset, + security_no_auth, + security_local_auth, + security_clear, + security_integrity_only, + security_encrypt, +}; + +enum error_source { + error_none, + error_invalid_argument, + error_remote_abort, + error_peer_busy, + error_from_system, + error_from_network, + error_from_parameters, +}; + +/* + * Local transport endpoint. + */ +class Endpoint : public refcount { +public: + struct sockaddr_rxrpc address; + + Endpoint(const struct sockaddr_rxrpc *local_addr) + { + if (local_addr) + memcpy(&address, local_addr, sizeof(address)); + else + memset(&address, 0, sizeof(address)); + } + + virtual ~Endpoint() = 0; + virtual void receive(bool nowait=false) = 0; + virtual ref new_client_call(struct Call_params *params, + const char *op_name, + size_t size) = 0; +}; + +/* + * Security details to use. + */ +class Security : public refcount { +public: + std::string key_name; /* Name of key to use */ + enum security_auth_level level; + Security(const std::string &_key_name, + enum rxrpc::security_auth_level _level) + : key_name(_key_name), level(_level) + {} + virtual ~Security(); +}; + +/* + * Call parameters. + */ +struct Call_params { + ref endpoint; + ref security; + struct sockaddr_rxrpc peer; + socklen_t peer_len; + bool exclusive; + bool upgrade_service; + unsigned short service_id_used; + + Call_params() + { + memset(&peer, 0, sizeof(peer)); + peer_len = 0; + exclusive = false; + upgrade_service = false; + service_id_used = 0; + } +}; + +class Resv { +public: + unsigned int max_ioc; /* Size of msg_iov[] */ + bool flat; /* T to do it all in one iovec */ + size_t alloc; /* Size of buffer allocated */ + size_t additional; /* Size of additional content */ +}; + +/* + * RxRPC encoded data buffer. + */ +class Enc_buffer : public refcount { +public: + net_xdr_t *enc_buf; /* Encoding buffer */ + net_xdr_t *enc_buf_p; /* Running pointer into encoding buffer */ + net_xdr_t *enc_buf_end; /* Encoding buffer limit */ + struct iovec *msg_iov; /* For sendmsg()'s msghdr */ + unsigned int ioc; /* Occupancy of msg_iov[] */ + bool flat; /* T to do it all in one iovec */ + size_t size; /* Size of content, including additional iovecs */ + Enc_buffer() + { + enc_buf = NULL; + enc_buf_p = NULL; + enc_buf_end = NULL; + msg_iov = NULL; + ioc = 0; + flat = false; + size = 0; + } + ~Enc_buffer(); +}; + +/* + * RxRPC received data buffer. + */ +class Rx_buffer { +public: +#define RXGEN_BUFFER_SIZE 1024 + unsigned char *buf; + unsigned short len; /* Amount of data in buffer */ + unsigned short used; /* How much data used */ + unsigned int index; /* Index in buffer list */ + bool no_delete; /* Don't delete the buffer */ + + Rx_buffer() + { + buf = NULL; + len = 0; + used = 0; + index = 0; + no_delete = false; + } + ~Rx_buffer() + { + if (!no_delete) + delete [] buf; + buf = NULL; + } +}; + +/* + * Received data queue. + */ +class Rx_queue { +public: + const char *op_name; /* Name of the RPC operation */ + bool client_side; /* True if client side of call */ + unsigned int dec_index; + std::list dec_buffers; /* List of received buffers */ + size_t dec_amount; /* Amount of data in dec_buffers */ + + Rx_queue(const char *opname = NULL); + ~Rx_queue(); + static void read_copy(void *buffer, size_t offset, const void *data, size_t len); + void read(void *buffer, size_t buffer_size, + void (*copy)(void *buffer, size_t offset, const void *data, size_t len) + = read_copy); + virtual void overrun(); +}; + +/* + * RxRPC call. + */ +class Call : public refcount, public Rx_queue { +public: + enum call_state { + call_cl_not_started, + call_cl_encoding_params, + call_cl_receiving_response, + + call_sv_not_started, + call_sv_receiving_params, + call_sv_wait_for_no_MSG_MORE, + call_sv_processing, + call_sv_encoding_response_split, + call_sv_encoding_response, + call_sv_response_encoded, + call_sv_waiting_for_final_ack, + call_completed, + }; + + enum call_completion { + call_incomplete, + call_success, + call_remotely_aborted, + call_locally_aborted, + call_network_error, + call_local_error, + call_rejected_busy, + }; + + ref endpoint; + struct sockaddr_rxrpc peer; + socklen_t peer_len; + unsigned short service_id; + enum call_state state; + enum call_completion completed; + uint32_t abort_code; + unsigned int error; + + bool split; /* True if split-mode call */ + bool exclusive; + bool upgrade_service; + bool have_rx_timestamp; + + struct timespec rx_timestamp; /* Timestamp of first received packet */ + + void (*completion_func)(Call *); + void (*throw_abort_func)(int ac, const char *op); + + /* Transmission phase state */ + unsigned long long total_tx_size; /* Total expected transmission size */ + uint32_t encode_error; + + /* Receive phase state */ + void (*dec_func)(Call *); + + Call(); + Call(struct Call_params *params, const char *op_name, size_t size); + virtual ~Call(); + + virtual void send_data(Enc_buffer *buf, bool more=false) = 0; + virtual void end(int32_t abort_code=0) = 0; + virtual void abort(int32_t abort_code) = 0; + virtual void end_split(void) = 0; + + void dec_underrun(size_t); + void request_received(); +}; + +extern void find_transport(void); +extern ref new_local_endpoint(const struct sockaddr_rxrpc *local_sa = NULL, + rxrpc::Security *security = NULL); +extern ref new_security(const std::string &key_name, + enum security_auth_level level); + +extern ref alloc_enc_buffer(Resv &resv); +extern ref make_call(Call_params *z_params, + const char *op_name, + Enc_buffer *z_request); +extern void receive_reply(Call *call); + +/* + * UUID. + */ +struct Uuid { + uuid_t uuid; + Uuid() { memset(uuid, 0, sizeof(uuid)); } +}; + +/* + * Opaque data. + */ +struct Opaque { + size_t buffer_size; + void *buffer; + bool del_buffer; + + inline Opaque(void) + { + buffer_size = 0; + buffer = NULL; + del_buffer = false; + } + inline ~Opaque(void) + { + if (del_buffer) + free(buffer); + } + inline size_t size(void) const { return buffer_size; } + inline void reserve(size_t size) + { + if (buffer && del_buffer) + free(buffer); + buffer = malloc(size + 1); + ((unsigned char *)buffer)[size] = 0; + del_buffer = true; + } +}; + +/* + * XDR encoding size calculation routines + */ +static inline void reserve(Resv &resv) +{ + resv.alloc += 4; +} + +static inline void reserve_a32(Resv &resv, unsigned int n) +{ + resv.alloc += 4 * n; +} + +static inline void reserve_u64(Resv &resv) +{ + resv.alloc += 8; +} + +static inline void reserve_a64(Resv &resv, unsigned int n) +{ + resv.alloc += 8 * n; +} + +static inline size_t xdr_round_up(unsigned int size) +{ + return (size + sizeof(net_xdr_t) - 1) & ~(sizeof(net_xdr_t) - 1); +} + +static inline bool reserve_opaque_inline(bool flat, unsigned int size, unsigned int max) +{ + return flat || (max <= 1024 && size <= 1024); +} + +static inline void reserve_opaque(Resv &resv, const Opaque &data, + unsigned int max) +{ + size_t size = data.size(); + size_t rs = xdr_round_up(size); + + reserve(resv); + if (reserve_opaque_inline(resv.flat, size, max)) { + resv.alloc += rs; + } else { + if ((size & 3) == 0) { + resv.additional += rs; + } else { + resv.alloc += 4; + resv.additional += rs - 4; + } + resv.max_ioc += 2; + } +} + +static inline void reserve_string(Resv &resv, const std::string &s, unsigned int max) +{ + Opaque op; + + op.buffer_size = s.size(), + reserve_opaque(resv, op, max); +} + +static inline void reserve_opr_uuid(Resv &resv, const Uuid &uuid) +{ + resv.alloc += 16; +} + +/* + * XDR encoding routines + */ +extern void enc_data(Enc_buffer *buf, const void *data, + unsigned int len, unsigned int max); + +static inline void enc(Enc_buffer *z_buf, uint32_t val) +{ + *z_buf->enc_buf_p++ = htonl(val); +} + +static inline void enc_u64(Enc_buffer *z_buf, uint64_t x) +{ + enc(z_buf, x >> 32); + enc(z_buf, x); +} + +static inline void enc_string(Enc_buffer *buf, const std::string &s, unsigned int max) +{ + enc_data(buf, s.c_str(), s.size(), max); +} + +static inline void enc_opaque(Enc_buffer *buf, const Opaque &data, unsigned int max) +{ + enc_data(buf, data.buffer, data.size(), max); +} + +static inline void enc_block(Enc_buffer *buf, const void *p, size_t s) +{ + memcpy(buf->enc_buf_p, p, s); + buf->enc_buf_p += s / 4; +} + +static inline void enc_uuid(Enc_buffer *buf, const Uuid &u) +{ + enc_block(buf, u.uuid, sizeof(u.uuid)); +} + +static inline void seal_buffer(Enc_buffer *buf) +{ + unsigned int len; + + /* call->ioc points to the iovec being filled, so we need to advance it + * if that has anything in it. + */ + len = (unsigned long)buf->enc_buf_p - (unsigned long)buf->msg_iov[buf->ioc].iov_base; + if (len) { + buf->msg_iov[buf->ioc].iov_len = len; + buf->ioc++; + } +} + +/* + * XDR decoding routines + */ +extern uint32_t dec(Rx_queue &rxq); +extern uint32_t dec(Rx_queue &rxq, size_t limit); +extern void dec_string(Rx_queue &rxq, std::string &, size_t max = UINT_MAX); +extern void dec_opaque(Rx_queue &rxq, Opaque &, size_t max = UINT_MAX); +extern void *dec_blob(Rx_queue &rxq, size_t size); + +static inline uint64_t dec_u64(Rx_queue &rxq) +{ + uint64_t x; + x = (uint64_t)dec(rxq) << 32; + x |= (uint64_t)dec(rxq); + return x; +} + +/* + * Error handling. + */ +struct nomem : std::exception { +}; + +class syserror : public std::system_error { +protected: + syserror() {} +public: + syserror(const char *s) + : system_error(std::error_code(errno, std::generic_category()), s) + { + } + syserror(const std::string &s) + : system_error(std::error_code(errno, std::generic_category()), s) + { + } + syserror(int err, const char *s) + : system_error(std::error_code(err, std::generic_category()), s) + { + } + syserror(int err, const std::string &s) + : system_error(std::error_code(err, std::generic_category()), s) + { + } + syserror(int err) + : system_error(std::error_code(err, std::generic_category())) + { + } +}; + +class network_error : public syserror { +public: + __attribute__((format(printf,2,3))) + network_error(const char *fmt, ...) + { + } + network_error(int err) + : syserror(err) + { + } +}; + +class rx_abort : public std::runtime_error { +protected: + int abort_code; + const char *abort_name; + const char *op_name; + + rx_abort(int abort, const char *name, const char *desc, const char *op); +public: + int get_abort_code() const { return abort_code; } +}; + +class unknown_abort : public rx_abort { +public: + unknown_abort(int abort_code, const char *op = NULL) + : rx_abort(abort_code, "UnknownAbort", "", op) {} +}; + +struct AbortRx : rx_abort { + AbortRx(int ac, const char *sym, const char *desc, const char *op) + : rx_abort(ac, sym, desc, op) {} +}; +struct AbortRX_CALL_DEAD : AbortRx { + AbortRX_CALL_DEAD(const char *op = NULL) + : AbortRx(RX_CALL_DEAD, "RX_CALL_DEAD", "", op) {} +}; +struct AbortRX_INVALID_OPERATION : AbortRx { + AbortRX_INVALID_OPERATION(const char *op = NULL) + : AbortRx(RX_INVALID_OPERATION, "RX_INVALID_OPERATION", "", op) {} +}; +struct AbortRX_CALL_TIMEOUT : AbortRx { + AbortRX_CALL_TIMEOUT(const char *op = NULL) + : AbortRx(RX_CALL_TIMEOUT, "RX_CALL_TIMEOUT", "", op) {} +}; +struct AbortRX_EOF : AbortRx { + AbortRX_EOF(const char *op = NULL) + : AbortRx(RX_EOF, "RX_EOF", "", op) {} +}; +struct AbortRX_PROTOCOL_ERROR : AbortRx { + AbortRX_PROTOCOL_ERROR(const char *op = NULL) + : AbortRx(RX_PROTOCOL_ERROR, "RX_PROTOCOL_ERROR", "", op) {} +}; +struct AbortRX_USER_ABORT : AbortRx { + AbortRX_USER_ABORT(const char *op = NULL) + : AbortRx(RX_USER_ABORT, "RX_USER_ABORT", "", op) {} +}; +struct AbortRX_ADDRINUSE : AbortRx { + AbortRX_ADDRINUSE(const char *op = NULL) + : AbortRx(RX_ADDRINUSE, "RX_ADDRINUSE", "", op) {} +}; +struct AbortRX_DEBUGI_BADTYPE : AbortRx { + AbortRX_DEBUGI_BADTYPE(const char *op = NULL) + : AbortRx(RX_DEBUGI_BADTYPE, "RX_DEBUGI_BADTYPE", "", op) {} +}; + +struct AbortRXGEN_CC_MARSHAL : AbortRx { + AbortRXGEN_CC_MARSHAL(const char *op = NULL) + : AbortRx(RXGEN_CC_MARSHAL, "RXGEN_CC_MARSHAL", "", op) {} +}; +struct AbortRXGEN_CC_UNMARSHAL : AbortRx { + AbortRXGEN_CC_UNMARSHAL(const char *op = NULL) + : AbortRx(RXGEN_CC_UNMARSHAL, "RXGEN_CC_UNMARSHAL", "", op) {} +}; +struct AbortRXGEN_SS_MARSHAL : AbortRx { + AbortRXGEN_SS_MARSHAL(const char *op = NULL) + : AbortRx(RXGEN_SS_MARSHAL, "RXGEN_SS_MARSHAL", "", op) {} +}; +struct AbortRXGEN_SS_UNMARSHAL : AbortRx { + AbortRXGEN_SS_UNMARSHAL(const char *op = NULL) + : AbortRx(RXGEN_SS_UNMARSHAL, "RXGEN_SS_UNMARSHAL", "", op) {} +}; +struct AbortRXGEN_DECODE : AbortRx { + AbortRXGEN_DECODE(const char *op = NULL) + : AbortRx(RXGEN_DECODE, "RXGEN_DECODE", "", op) {} +}; +struct AbortRXGEN_OPCODE : AbortRx { + AbortRXGEN_OPCODE(const char *op = NULL) + : AbortRx(RXGEN_OPCODE, "RXGEN_OPCODE", "", op) {} +}; +struct AbortRXGEN_SS_XDRFREE : AbortRx { + AbortRXGEN_SS_XDRFREE(const char *op = NULL) + : AbortRx(RXGEN_SS_XDRFREE, "RXGEN_SS_XDRFREE", "", op) {} +}; +struct AbortRXGEN_CC_XDRFREE : AbortRx { + AbortRXGEN_CC_XDRFREE(const char *op = NULL) + : AbortRx(RXGEN_CC_XDRFREE, "RXGEN_CC_XDRFREE", "", op) {} +}; + +extern void throw_abort(int abort_code, const char *op=NULL); + +} /* end namespace rxrpc */ + +#endif /* RXRPC_HH */ diff --git a/lib/rxrpc_core.C b/lib/rxrpc_core.C new file mode 100644 index 0000000..9b0b572 --- /dev/null +++ b/lib/rxrpc_core.C @@ -0,0 +1,181 @@ +/* rxrpc core + * + * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public Licence + * as published by the Free Software Foundation; either version + * 2 of the Licence, or (at your option) any later version. + */ + +#include +#include +#include +#include +#include +#include "rxrpc.H" +#include "transport.H" + +using rxrpc::ref; + +bool rxrpc::debug_buffers; /* Debug buffer handling */ +bool rxrpc::debug_core; /* Debug core management */ +bool rxrpc::debug_data; /* Dump data */ +bool rxrpc::debug_ops; /* Debug operations */ +bool rxrpc::debug_transport; /* Debug transport */ +bool rxrpc::debug_xdr; /* Debug XDR encoding/decoding */ + +#define debug(fmt, ...) \ + do { if (rxrpc::debug_core) printf("CORE: " fmt, ## __VA_ARGS__); } while (0) +#define debug_op(fmt, ...) \ + do { if (rxrpc::debug_ops) printf("OP: " fmt, ## __VA_ARGS__); } while (0) + +rxrpc::refcount::~refcount() {} +rxrpc::Security::~Security() {} +rxrpc::Transport::~Transport() {} + +static ref rxrpc_transport; + +void rxrpc::find_transport(void) +{ + if (!rxrpc_transport) + rxrpc_transport = create_af_rxrpc_transport(); +} + +/* + * Set up a new transport + */ +ref rxrpc::new_local_endpoint(const struct sockaddr_rxrpc *local_addr, + rxrpc::Security *security) +{ + if (!rxrpc_transport) + throw std::invalid_argument("No Rx transport loaded"); + + return rxrpc_transport->new_local_endpoint(local_addr, security); +} + +/* + * Destroy a local endpoint. + */ +rxrpc::Endpoint::~Endpoint() +{ +} + +/* + * Set up a new security context. + */ +ref rxrpc::new_security(const std::string &key_name, + enum rxrpc::security_auth_level level) +{ + if (level < security_unset || + level > security_encrypt) + throw std::invalid_argument("Invalid security level"); + + return rxrpc_transport->new_security(key_name, level); +} + +/* + * Construct a server call.. + */ +rxrpc::Call::Call() +{ + memset(&peer, 0, sizeof(peer)); + peer_len = 0; + service_id = 0; + state = call_cl_not_started; + completed = call_incomplete; + abort_code = 0; + error = 0; + client_side = false; + split = false; + exclusive = false; + upgrade_service = false; + have_rx_timestamp = false; + memset(&rx_timestamp, 0, sizeof(rx_timestamp)); + completion_func = NULL; + throw_abort_func = rxrpc::throw_abort; + total_tx_size = 0; + encode_error = 0; + dec_func = NULL; +} + +/* + * Construct a client call. + */ +rxrpc::Call::Call(struct Call_params *params, const char *op_name, size_t size) + : Rx_queue(op_name) +{ + endpoint = params->endpoint; + peer = params->peer; + peer_len = sizeof(peer); + service_id = params->peer.srx_service;; + state = call_cl_not_started; + completed = call_incomplete; + abort_code = 0; + error = 0; + client_side = true; + split = false; + exclusive = params->exclusive; + upgrade_service = params->upgrade_service; + have_rx_timestamp = false; + memset(&rx_timestamp, 0, sizeof(rx_timestamp)); + completion_func = NULL; + throw_abort_func = rxrpc::throw_abort; + total_tx_size = size; + encode_error = 0; + dec_func = rxrpc::receive_reply; +} + +/* + * Set up a client call and prepare for encoding. + */ +ref rxrpc::make_call(struct Call_params *params, + const char *op_name, + Enc_buffer *buf) +{ + ref call; + + if (params->peer_len < 5 * 2 || + params->peer_len > sizeof(call->peer)) + throw std::invalid_argument("Peer address too small"); + + if (params->peer.srx_family != AF_RXRPC) + throw std::invalid_argument("Peer address family unsupported"); + + call = params->endpoint->new_client_call(params, op_name, buf->size); + + debug_op("Send %s\n", op_name); + return call; +} + +/* + * Call destructor + */ +rxrpc::Call::~Call() +{ +} + +/* + * Reply decoder. + */ +void rxrpc::receive_reply(Call *call) +{ + if (call->state == rxrpc::Call::call_completed && + call->completed == rxrpc::Call::call_success) + debug("Reply received\n"); +} + +/* + * End of reception of request. There should be no more data in the buffers. + */ +void rxrpc::Call::request_received() +{ + switch (state) { + case call_sv_receiving_params: + default: + fprintf(stderr, "rxrpc::request_received() called in state %u\n", + state); + break; + } +} diff --git a/lib/rxrpc_xdr.C b/lib/rxrpc_xdr.C new file mode 100644 index 0000000..00a272f --- /dev/null +++ b/lib/rxrpc_xdr.C @@ -0,0 +1,325 @@ +/* XDR coding and decoding. + * + * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public Licence + * as published by the Free Software Foundation; either version + * 2 of the Licence, or (at your option) any later version. + */ + +#include +#include +#include "rxrpc.H" + +using rxrpc::ref; + +namespace rxrpc { + +static inline size_t calc_padding(size_t x) +{ + return (4 - (x & 3)) & 3; +} + +static void enc_advance_iov_by_2(Enc_buffer *); +static void dec_string_copy(void *, size_t, const void *, size_t); +static void dec_ignore_padding(void *, size_t, const void *, size_t); +} + +#define debug(fmt, ...) \ + do { if (rxrpc::debug_xdr) printf("XDR: " fmt, ## __VA_ARGS__); } while (0) +#define debug_buffer(fmt, ...) \ + do { if (rxrpc::debug_buffers) printf("BUF: " fmt, ## __VA_ARGS__); } while (0) + +#define UAENOMEM 0x2f6df0b + +/* + * Set up an encoded buffer. + */ +ref rxrpc::alloc_enc_buffer(Resv &resv) +{ + ref buf; + size_t alloc; + + if (resv.max_ioc == 0) + abort(); + + buf = new Enc_buffer; + buf->msg_iov = new iovec[resv.max_ioc]; + + alloc = (resv.alloc + 3) / 4; + buf->enc_buf = new net_xdr_t[alloc]; + + buf->flat = resv.flat; + buf->size = resv.alloc + resv.additional; + buf->enc_buf_p = buf->enc_buf; + buf->enc_buf_end = buf->enc_buf + alloc; + buf->msg_iov[0].iov_base = buf->enc_buf_p; + debug_buffer("Tx ALLOC s=%zx a=%zx ioc=%u\n", buf->size, resv.alloc, resv.max_ioc); + return buf; +} + +rxrpc::Enc_buffer::~Enc_buffer() +{ + delete [] msg_iov; + delete [] enc_buf; +} + +/* + * Advance the iov array by two slots, leaving a slot for a blob. + */ +static void rxrpc::enc_advance_iov_by_2(Enc_buffer *buf) +{ + unsigned int ioc = buf->ioc; + + /* Note that iov_base may not be 4-bytes aligned if it starts with blob + * padding. + */ + buf->msg_iov[ioc].iov_len = + (unsigned long)buf->enc_buf_p - (unsigned long)buf->msg_iov[ioc].iov_base; + + ioc += 2; + buf->msg_iov[ioc].iov_base = buf->enc_buf_p; + buf->ioc = ioc; +} + +/* + * Encode an opaque blob to XDR. We copy in the blob if it's small, otherwise + * we assign it its own iov slot. + */ +void rxrpc::enc_data(Enc_buffer *buf, + const void *blob, unsigned int size, unsigned int max) +{ + unsigned int ioc; + size_t padsize = calc_padding(size); + size_t total = size + padsize; + + enc(buf, size); + + if (size == 0) { + /* Nothing to store */ + } else if (reserve_opaque_inline(buf->flat, size, max)) { + if (size & 3) + buf->enc_buf_p[size >> 2] = 0; /* Clear padding */ + memcpy(buf->enc_buf_p, blob, size); + buf->enc_buf_p += total / sizeof(net_xdr_t); + } else { + enc_advance_iov_by_2(buf); + ioc = buf->ioc; + buf->msg_iov[ioc - 1].iov_len = size; + buf->msg_iov[ioc - 1].iov_base = (void *)blob; + if (padsize > 0) { + enc(buf, 0); + buf->msg_iov[ioc].iov_base += 4 - padsize; + buf->msg_iov[ioc].iov_len += padsize; + } + } +} + +void rxrpc::Call::dec_underrun(size_t size) +{ + if (client_side) + throw std::runtime_error( + fmt::format("rxrpc XDR decoder underrun ({:x}/{:x}) on {}", + dec_amount, size, op_name)); + else + throw AbortRXGEN_SS_UNMARSHAL(op_name); +} + +rxrpc::Rx_queue::Rx_queue(const char *opname) : op_name(opname) +{ + dec_index = 0; + dec_amount = 0; +} + +rxrpc::Rx_queue::~Rx_queue() +{ +} + + +void rxrpc::Rx_queue::overrun() +{ +#if 0 + if (state == call_completed && + completed != call_success) + return; +#endif + printf("Partial fail\n"); +} + +/* + * Default copier for rxrpc::Call::read. + */ +void rxrpc::Rx_queue::read_copy(void *buffer, size_t offset, const void *data, size_t len) +{ + memcpy(buffer + offset, data, len); +} + +/* + * Read data from the receive buffers. + */ +void rxrpc::Rx_queue::read(void *buffer, size_t buffer_size, + void (*copy)(void *buffer, size_t offset, const void *data, size_t len)) +{ + unsigned int seg, i; + size_t offset = 0; + + debug("READ %zu/%zu\n", buffer_size, dec_amount); + + if (buffer_size > dec_amount) + return overrun(); + + while (offset < buffer_size && !dec_buffers.empty()) { + Rx_buffer &b = dec_buffers.front(); + + seg = b.len - b.used; + if (seg > buffer_size - offset) + seg = buffer_size - offset; + + debug("B[%u] %zu/%zu from %u/%zu\n", + b.index, offset, buffer_size, seg, dec_amount); + + (*copy)(buffer, offset, b.buf + b.used, seg); + offset += seg; + b.used += seg; + dec_amount -= seg; + + if (b.used == b.len) { + dec_buffers.pop_front(); + } else { + if (offset != buffer_size) + ::abort(); + } + } + + if (offset < buffer_size) { + fprintf(stderr, "Inconsistent amount of buffered rx data\n"); + ::abort(); + } + + if (dec_buffers.empty() && dec_amount != 0) { + fprintf(stderr, "dec_amount should be zero\n"); + ::abort(); + } + + if (debug_xdr) { + seg = buffer_size <= 32 ? buffer_size : 32; + debug(""); + for (i = 0; i < seg; i++) + printf("%02x", ((unsigned char *)buffer)[i]); + printf("\n"); + } +} + +/* + * Decode a 32-bit integer. + */ +uint32_t rxrpc::dec(Rx_queue &rxq) +{ + uint32_t x; + + rxq.read(&x, sizeof(x)); + return ntohl(x); +} + +/* + * Decode a 32-bit integer and check limit. + */ +uint32_t rxrpc::dec(Rx_queue &rxq, size_t limit) +{ + uint32_t x; + + x = dec(rxq); + if (x > limit) { + if (rxq.client_side) { + throw std::runtime_error( + fmt::format("XDR: Decoded value exceeded limit ({:x}/{:x}) on {}", + x, limit, rxq.op_name)); + } else { + throw AbortRXGEN_SS_UNMARSHAL(rxq.op_name); + } + } + + return x; +} + +static void rxrpc::dec_string_copy(void *buffer, size_t offset, const void *data, size_t len) +{ + std::string *s = (std::string *)buffer; + + s->insert(offset, (const char *)data, len); +} + +static void rxrpc::dec_ignore_padding(void *buffer, size_t offset, const void *data, size_t len) +{ +} + +/* + * Decode a string. + */ +void rxrpc::dec_string(Rx_queue &rxq, std::string &s, size_t max) +{ + size_t len; + + len = dec(rxq, max); + s.reserve(len); + rxq.read(&s, len, dec_string_copy); + len &= 3; + if (len) + rxq.read(NULL, 4 - len, dec_ignore_padding); +} + +/* + * Decode an opaque + */ +void rxrpc::dec_opaque(Rx_queue &rxq, Opaque &data, size_t max) +{ + size_t len; + + len = dec(rxq, max); + data.reserve(len); + rxq.read(data.buffer, len); + data.buffer_size = len; + len &= 3; + if (len) + rxq.read(NULL, 4 - len, dec_ignore_padding); +} + +/* + * Construct an abort exception class. + */ +rxrpc::rx_abort::rx_abort(int abort, const char *name, const char *desc, const char *op) + : std::runtime_error(fmt::format("RPC {} aborted with {}", op, name)), + abort_code(abort), abort_name(name), op_name(op) +{ + if (debug_ops) + printf("OP: Rx ABORT: %d %s\n", abort_code, abort_name); +} + +/* + * Throw an exception appropriate to the abort code. + */ +void rxrpc::throw_abort(int abort_code, const char *op) +{ + switch (abort_code) { + case RX_CALL_DEAD: throw AbortRX_CALL_DEAD(op); + case RX_INVALID_OPERATION: throw AbortRX_INVALID_OPERATION(op); + case RX_CALL_TIMEOUT: throw AbortRX_CALL_TIMEOUT(op); + case RX_EOF: throw AbortRX_EOF(op); + case RX_PROTOCOL_ERROR: throw AbortRX_PROTOCOL_ERROR(op); + case RX_USER_ABORT: throw AbortRX_USER_ABORT(op); + case RX_ADDRINUSE: throw AbortRX_ADDRINUSE(op); + case RX_DEBUGI_BADTYPE: throw AbortRX_DEBUGI_BADTYPE(op); + case RXGEN_CC_MARSHAL: throw AbortRXGEN_CC_MARSHAL(op); + case RXGEN_CC_UNMARSHAL: throw AbortRXGEN_CC_UNMARSHAL(op); + case RXGEN_SS_MARSHAL: throw AbortRXGEN_SS_MARSHAL(op); + case RXGEN_SS_UNMARSHAL: throw AbortRXGEN_SS_UNMARSHAL(op); + case RXGEN_DECODE: throw AbortRXGEN_DECODE(op); + case RXGEN_OPCODE: throw AbortRXGEN_OPCODE(op); + case RXGEN_SS_XDRFREE: throw AbortRXGEN_SS_XDRFREE(op); + case RXGEN_CC_XDRFREE: throw AbortRXGEN_CC_XDRFREE(op); + default: + throw unknown_abort(abort_code, op); + } +} diff --git a/lib/transport.H b/lib/transport.H new file mode 100644 index 0000000..7887261 --- /dev/null +++ b/lib/transport.H @@ -0,0 +1,75 @@ +/* Access points into the dynamically loaded Rx transport module + * + * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public Licence + * as published by the Free Software Foundation; either version + * 2 of the Licence, or (at your option) any later version. + */ + +#ifndef _RXRPC_TRANSPORT_H +#define _RXRPC_TRANSPORT_H + +#include +#include +#include +#include "rxrpc.H" + +namespace rxrpc { + +enum recv_condition { + recv_more, + recv_final_ack, + recv_complete, + recv_complete_now_send, + recv_abort, + recv_net_error, + recv_local_error, + recv_busy, +}; + +#if 0 +typedef void (*incoming_call_func_t)(struct service *service, + void *caller_data, + const struct sockaddr_rxrpc *peer, + socklen_t *peerlen); +#endif + +class Transport : public refcount { +public: + virtual std::string name() const = 0; + + virtual ref new_local_endpoint(const struct sockaddr_rxrpc *local_addr, + Security *security) = 0; + + virtual ref new_security(const std::string &key_name, + enum rxrpc::security_auth_level level) = 0; + +#if 0 + virtual int new_service(Endpoint *&endpoint, + Service *service, + incoming_call_func_t new_call_func, + void *caller_data, + uint16_t local_port, + uint16_t local_service, + Security *permits) = 0; + + virtual void shutdown_service(Service *service) = 0; + + virtual int accept_call(Call *call, + Service *service, + void *caller_data, + struct sockaddr_rxrpc *_sa, + socklen_t *_salen) = 0; +#endif + + virtual ~Transport(); +}; + +} /* end namespace rxrpc */ + +extern rxrpc::ref create_af_rxrpc_transport(); + +#endif /* _RXRPC_TRANSPORT_H */