From a9d57cf7e834c089adbb85291f0831ba5f3ba064 Mon Sep 17 00:00:00 2001 From: David Howells Date: Mon, 30 Sep 2019 10:13:00 +0100 Subject: [PATCH] Implement basic I/O on top of an AF_RXRPC socket Implement basic I/O on top of an AF_RXRPC socket so that the codecs generated by rxgen.py will work. Signed-off-by: David Howells --- .gitignore | 4 + lib/Makefile | 21 +- lib/af_rxrpc.c | 935 +++++++++++++++++++++++++++++++++++++++++++++++ lib/list.h | 286 +++++++++++++++ lib/rxrpc.h | 145 +++++++- lib/rxrpc_core.c | 326 +++++++++++++++++ lib/rxrpc_xdr.c | 256 +++++++++++++ lib/transport.h | 97 +++++ 8 files changed, 2063 insertions(+), 7 deletions(-) create mode 100644 lib/af_rxrpc.c create mode 100644 lib/list.h create mode 100644 lib/rxrpc_core.c create mode 100644 lib/rxrpc_xdr.c create mode 100644 lib/transport.h diff --git a/.gitignore b/.gitignore index d7c40ee..a6a6430 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,8 @@ *~ +*.o +.*.o.d +*.so lib/afs_xg.* +lib/libkafs_utils.* rxgen/__pycache__/ rxgen/parsetab.py diff --git a/lib/Makefile b/lib/Makefile index a904339..e50daab 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -2,11 +2,26 @@ RXGEN := ../rxgen/rxgen.py RPC_HEADERS := $(wildcard ../rpc-api/*.h) RPC_DEFS := $(filter-out %test.xg, $(sort $(wildcard ../rpc-api/*.xg))) -CFLAGS := -g -Wall -Wformat -fpic +CFLAGS := -g -Wall -Wformat -fpic -O2 -#RPC_DEFS := ../rpc-api/test.xg +LIB_SRCS := afs_xg.c rxrpc_xdr.c af_rxrpc.c rxrpc_core.c +LIB_OBJS := $(patsubst %.c,%.o,$(LIB_SRCS)) -afs_xg.o: afs_xg.c afs_xg.h rxrpc.h +%.o: %.c + $(CC) $(CPPFLAGS) -MMD -MF .$@.d $(CFLAGS) -o $@ -c $< + +all: libkafs_utils.a + +libkafs_utils.a: $(LIB_OBJS) + $(AR) rcs -o $@ $(LIB_OBJS) afs_xg.c afs_xg.h: $(RPC_HEADERS) $(RPC_DEFS) $(wildcard ../rxgen/*.py) Makefile $(RXGEN) $(RPC_HEADERS) $(RPC_DEFS) + +DEPS := $(wildcard .*.o.d) +ifneq ($(DEPS),) +include $(DEPS) +endif + +clean: + $(RM) *~ *.o afs_xg.[ch] *.so *.a $(DEPS) diff --git a/lib/af_rxrpc.c b/lib/af_rxrpc.c new file mode 100644 index 0000000..814e2b0 --- /dev/null +++ b/lib/af_rxrpc.c @@ -0,0 +1,935 @@ +/* AF_RXRPC driver + * + * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public Licence + * as published by the Free Software Foundation; either version + * 2 of the Licence, or (at your option) any later version. + */ + +#define _XOPEN_SOURCE +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "transport.h" + +#define debug(fmt, ...) \ + do { if (rxrpc_debug_transport) printf("AFRX: "fmt, ## __VA_ARGS__); } while (0) +#define debug_buffer(fmt, ...) \ + do { if (rxrpc_debug_buffers) printf("BUF: "fmt, ## __VA_ARGS__); } while (0) + +#define RXRPC_SELECT_CALL_FOR_RECV 7 /* Specify the call for recvmsg, SIOCINQ, etc. */ +#define RXRPC_SELECT_CALL_FOR_SEND 8 /* Specify the call for splice, SIOCOUTQ, etc. */ + +#define RXRPC_SET_SECURITY_KEY 14 +#define RXRPC_SET_SECURITY_LEVEL 15 + +/* + * Add a call ID to a control message sequence. + */ +#define RXRPC_ADD_CALLID(control, ctrllen, id) \ +do { \ + void *__buffer = (control); \ + unsigned long *__data; \ + struct cmsghdr *__cmsg; \ + __cmsg = __buffer + (ctrllen); \ + __cmsg->cmsg_len = CMSG_LEN(sizeof(*__data)); \ + __cmsg->cmsg_level = SOL_RXRPC; \ + __cmsg->cmsg_type = RXRPC_USER_CALL_ID; \ + __data = (void *)CMSG_DATA(__cmsg); \ + *__data = (id); \ + (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len); \ +} while (0) + +/* + * Add an abort instruction to a control message sequence. + */ +#define RXRPC_ADD_ABORT(control, ctrllen, abort_code) \ +do { \ + void *__buffer = (control); \ + unsigned int *__data; \ + struct cmsghdr *__cmsg; \ + __cmsg = __buffer + (ctrllen); \ + __cmsg->cmsg_len = CMSG_LEN(sizeof(*__data)); \ + __cmsg->cmsg_level = SOL_RXRPC; \ + __cmsg->cmsg_type = RXRPC_ABORT; \ + __data = (void *)CMSG_DATA(__cmsg); \ + *__data = (abort_code); \ + (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len); \ +} while (0) + +/* + * Add a security level instruction to a control message sequence. + */ +#define RXRPC_ADD_SECURITY_LEVEL(control, ctrllen, sec_level) \ +do { \ + void *__buffer = (control); \ + unsigned int *__data; \ + struct cmsghdr *__cmsg; \ + __cmsg = __buffer + (ctrllen); \ + __cmsg->cmsg_len = CMSG_LEN(sizeof(*__data)); \ + __cmsg->cmsg_level = SOL_RXRPC; \ + __cmsg->cmsg_type = RXRPC_SET_SECURITY_LEVEL; \ + __data = (void *)CMSG_DATA(__cmsg); \ + *__data = (sec_level); \ + (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len); \ +} while (0) + +/* + * Add a total transmit length instruction to a control message sequence. + */ +#define RXRPC_ADD_TX_LENGTH(control, ctrllen, length) \ +do { \ + void *__buffer = (control); \ + unsigned long long __data = (length); \ + struct cmsghdr *__cmsg; \ + __cmsg = __buffer + (ctrllen); \ + __cmsg->cmsg_len = CMSG_LEN(sizeof(__data)); \ + __cmsg->cmsg_level = SOL_RXRPC; \ + __cmsg->cmsg_type = RXRPC_TX_LENGTH; \ + memcpy(CMSG_DATA(__cmsg), &__data, sizeof(__data)); \ + (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len); \ +} while (0) + +/* + * Add a miscellaneous option to a control message sequence. + */ +#define RXRPC_ADD_MISC_OPTION(control, ctrllen, opt) \ +do { \ + void *__buffer = (control); \ + struct cmsghdr *__cmsg; \ + __cmsg = __buffer + (ctrllen); \ + __cmsg->cmsg_len = CMSG_LEN(0); \ + __cmsg->cmsg_level = SOL_RXRPC; \ + __cmsg->cmsg_type = opt; \ + (ctrllen) += CMSG_ALIGN(__cmsg->cmsg_len); \ +} while (0) + +struct rxrpc_transport_private { + struct sockaddr_rxrpc local; + int fd; + unsigned int sec_level; + + /* Message reception infrastructure */ + struct list_head rx_spare_buffers; /* Spare bufferage for receive */ + unsigned int rx_nr_buffers; /* Number of buffers on the list */ +#define CONN_MSGS 3 +#define CONN_CMSG_SIZE 128 +#define CONN_NR_IOV 4 + struct mmsghdr rx_msgs[CONN_MSGS]; /* Message headers */ + struct sockaddr_rxrpc rx_peer[CONN_MSGS]; /* Mass peer address bufferage */ + struct iovec rx_iov[CONN_MSGS * CONN_NR_IOV]; /* Mass data bufferage */ + unsigned char rx_cmsg[CONN_MSGS * CONN_CMSG_SIZE]; /* Mass control message bufferage */ +}; + +struct rxrpc_security_private { + unsigned int min_sec_level; /* RXRPC_SECURITY_* value */ +}; + +struct rxrpc_call_private { + bool known_to_kernel; +}; + +/* + * dump the control messages + */ +static __attribute__((unused)) +void dump_cmsg(const char *dir, struct msghdr *msg) +{ + struct timespec tv; + struct cmsghdr *cmsg; + unsigned long long tx_len; + unsigned long user_id; + unsigned char *p; + int abort_code; + int n; + + for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { + n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg)); + p = CMSG_DATA(cmsg); + + debug("%s CMSG: %zu: ", dir, cmsg->cmsg_len); + + switch (cmsg->cmsg_level) { + case SOL_SOCKET: + switch (cmsg->cmsg_type) { + case SO_TIMESTAMPNS_NEW: + printf("SO_TIMESTAMPNS_NEW: "); + if (n != sizeof(tv)) + goto dump_data; + memcpy(&tv, p, sizeof(tv)); + printf("%lu.%lu\n", tv.tv_sec, tv.tv_nsec); + continue; + default: + printf("SOCKET_%d", cmsg->cmsg_type); + break; + } + break; + + case SOL_RXRPC: + switch (cmsg->cmsg_type) { + case RXRPC_USER_CALL_ID: + printf("RXRPC_USER_CALL_ID: "); + if (n != sizeof(user_id)) + goto dump_data; + memcpy(&user_id, p, sizeof(user_id)); + printf("%lx\n", user_id); + continue; + + case RXRPC_ABORT: + printf("RXRPC_ABORT: "); + if (n != sizeof(abort_code)) + goto dump_data; + memcpy(&abort_code, p, sizeof(abort_code)); + printf("%d\n", abort_code); + continue; + + case RXRPC_ACK: + printf("RXRPC_ACK"); + if (n != 0) + goto dump_data_colon; + goto print_nl; + + case RXRPC_NET_ERROR: + printf("RXRPC_NET_ERROR: "); + if (n != sizeof(abort_code)) + goto dump_data; + memcpy(&abort_code, p, sizeof(abort_code)); + printf("%s\n", strerror(abort_code)); + continue; + + case RXRPC_BUSY: + printf("RXRPC_BUSY"); + if (n != 0) + goto dump_data_colon; + goto print_nl; + + case RXRPC_LOCAL_ERROR: + printf("RXRPC_LOCAL_ERROR: "); + if (n != sizeof(abort_code)) + goto dump_data; + memcpy(&abort_code, p, sizeof(abort_code)); + printf("%s\n", strerror(abort_code)); + continue; + + case RXRPC_UPGRADE_SERVICE: + printf("RXRPC_UPGRADE_SERVICE\n"); + continue; + case RXRPC_TX_LENGTH: + printf("RXRPC_TX_LENGTH: "); + if (n != sizeof(tx_len)) + goto dump_data; + memcpy(&tx_len, p, sizeof(tx_len)); + printf("%llu\n", tx_len); + continue; + case RXRPC_SET_CALL_TIMEOUT: + printf("RXRPC_SET_CALL_TIMEOUT\n"); + continue; + default: + printf("RXRPC_%d", cmsg->cmsg_type); + break; + } + break; + + default: + printf("l=%d t=%d", cmsg->cmsg_level, cmsg->cmsg_type); + break; + } + + + dump_data_colon: + printf(": "); + dump_data: + printf("{"); + for (; n > 0; n--, p++) + printf("%02x", *p); + + print_nl: + printf("}\n"); + } +} + +/* + * Initialise the transport module + */ +static bool af_rxrpc_create_local_endpoint(struct rxrpc_local_endpoint *endpoint) +{ + struct rxrpc_transport_private *trans; + int opt, i; + + if (endpoint->addr_len > 0) { + if (endpoint->addr_len > sizeof(trans->local.transport)) { + errno = EINVAL; + return false; + } + + switch (endpoint->addr.transport.family) { + case AF_INET: + case AF_INET6: + break; + default: + fprintf(stderr, "Unsupported transport type\n"); + errno = EPROTOTYPE; + return false; + } + } + + trans = calloc(1, sizeof(*trans)); + if (!trans) + return false; + + init_list_head(&trans->rx_spare_buffers); + + /* Set up the message descriptors */ + for (i = 0; i < CONN_MSGS * CONN_NR_IOV; i++) + trans->rx_iov[i].iov_len = RXGEN_BUFFER_SIZE; + + for (i = 0; i < CONN_MSGS; i++) { + struct mmsghdr *m = &trans->rx_msgs[i]; + + m->msg_hdr.msg_iov = trans->rx_iov + i * CONN_NR_IOV; + m->msg_hdr.msg_iovlen = CONN_NR_IOV; + m->msg_hdr.msg_name = &trans->rx_peer[i]; + m->msg_hdr.msg_namelen = sizeof(trans->rx_peer[i]); + m->msg_hdr.msg_control = trans->rx_cmsg; + m->msg_hdr.msg_controllen = CONN_CMSG_SIZE; + m->msg_hdr.msg_flags = 0; + } + + /* Open up a socket for talking to the AF_RXRPC module */ + trans->fd = socket(AF_RXRPC, SOCK_DGRAM, PF_INET6); + if (trans->fd < 0) + goto error_priv; + + opt = 1; + if (setsockopt(trans->fd, SOL_SOCKET, SO_TIMESTAMPNS_NEW, + (char *)&opt, sizeof(opt)) == -1) { + perror("SO_TIMESTAMPNS_NEW"); + goto error_sock; + } + + if (endpoint->addr_len > 0) { + struct sockaddr_rxrpc *srx = &trans->local; + + srx->srx_family = AF_RXRPC; + srx->transport_type = SOCK_DGRAM; + srx->transport_len = endpoint->addr_len; + memcpy(&srx->transport, &endpoint->addr, endpoint->addr_len); + + if (bind(trans->fd, (struct sockaddr *)srx, sizeof(*srx)) == -1) + goto error_sock; + } + + endpoint->priv = trans; + return true; + +error_sock: + close(trans->fd); +error_priv: + free(trans); + return false; +} + +/* + * Free the transport module + */ +static void af_rxrpc_free_local_endpoint(struct rxrpc_local_endpoint *endpoint) +{ + struct rxrpc_transport_private *trans = endpoint->priv; + struct rxrpc_rx_buf *b; + int i; + + close(trans->fd); + + for (i = 0; i < CONN_MSGS * CONN_NR_IOV; i++) + free(trans->rx_iov[i].iov_base); + + while (!list_empty(&trans->rx_spare_buffers)) { + b = list_entry(trans->rx_spare_buffers.next, struct rxrpc_rx_buf, link); + list_del(&b->link); + free(b->buf); + free(b); + } + + free(trans); +} + +/* + * Set up a security descriptor. Note that the key name can be NULL (for + * instance if -noauth was passed). + */ +static bool af_rxrpc_new_security(struct rxrpc_local_endpoint *endpoint, + struct rxrpc_security *security, + struct rxrpc_result *_result) +{ + struct rxrpc_transport_private *trans = endpoint->priv; + struct rxrpc_security_private *sec; + + sec = calloc(1, sizeof(*sec)); + if (!sec) + return rxrpc_nomem(_result); + + switch (security->level) { + case rxrpc_security_no_auth: + break; + case rxrpc_security_local_auth: + fprintf(stderr, "Local auth not supported by AF_RXRPC transport\n"); + goto inval; + case rxrpc_security_unset: + case rxrpc_security_clear: + if (security->key_name_len == 0) + goto inval; + sec->min_sec_level = RXRPC_SECURITY_PLAIN; + break; + case rxrpc_security_integrity_only: + if (security->key_name_len == 0) + goto inval; + sec->min_sec_level = RXRPC_SECURITY_AUTH; + break; + case rxrpc_security_encrypt: + if (security->key_name_len == 0) + goto inval; + sec->min_sec_level = RXRPC_SECURITY_ENCRYPT; + break; + } + + if (security->level != rxrpc_security_no_auth) { + debug("OPT RXRPC_SECURITY_KEY %s\n", security->key_name); + if (setsockopt(trans->fd, SOL_RXRPC, RXRPC_SECURITY_KEY, + security->key_name, security->key_name_len) < 0) { + free(sec); + return rxrpc_syserror(_result, "setsockopt(RXRPC_SECURITY_KEY)"); + } + + debug("OPT RXRPC_MIN_SECURITY_LEVEL %u\n", sec->min_sec_level); + if (setsockopt(trans->fd, SOL_RXRPC, RXRPC_MIN_SECURITY_LEVEL, + &sec->min_sec_level, sizeof(sec->min_sec_level)) < 0) { + free(sec); + return rxrpc_syserror(_result, "setsockopt(RXRPC_MIN_SECURITY_LEVEL)"); + } + } + + trans->sec_level = sec->min_sec_level; + security->priv = sec; + return true; + +inval: + free(sec); + _result->source = rxrpc_error_from_parameters; + _result->error = EINVAL; + return false; +} + +/* + * Free a security descriptor + */ +static void af_rxrpc_free_security(struct rxrpc_security *security) +{ + free(security->priv); +} + +/* + * Allocate a call handle for a new call. + */ +static bool af_rxrpc_make_call(struct rxrpc_call *call, struct rxrpc_result *_result) +{ + struct rxrpc_call_private *ch; + + ch = calloc(1, sizeof(*call)); + if (!ch) + return rxrpc_nomem(_result); + + call->priv = ch; + return true; +} + +/* + * Dump data from an output buffer. + */ +static void dump_enc_buffers(struct rxrpc_buffer *buf) +{ + unsigned int pos = 0; + bool nl = false; + int ioc, i; + + for (ioc = 0; ioc < buf->ioc; ioc++) { + for (i = 0; i < buf->msg_iov[ioc].iov_len; i++) { + if (!(pos & 3)) { + if (!(pos & 31)) + printf("%06x: ", pos); + else + printf(" "); + } + + printf("%02x", ((unsigned char *)buf->msg_iov[ioc].iov_base)[i]); + pos++; + nl = false; + if ((pos & 31) == 0) { + printf("\n"); + nl = true; + } + } + } + + if (!nl) + printf("\n"); +} + +/* + * Send buffered data. + */ +static bool af_rxrpc_send_data(struct rxrpc_call *call, + struct rxrpc_buffer *buf, + bool more, + struct rxrpc_result *_result) +{ + struct msghdr msg; + unsigned char control[128]; + unsigned int flags = 0, tmp; + socklen_t olen; + size_t ctrllen; + char abuf[512]; + int ret, i; + + if (rxrpc_debug_transport) + printf("\n"); + + /* Request an operation */ + ctrllen = 0; + RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)call); + if (call->exclusive) + RXRPC_ADD_MISC_OPTION(control, ctrllen, RXRPC_EXCLUSIVE_CALL); + if (call->upgrade_service) + RXRPC_ADD_MISC_OPTION(control, ctrllen, RXRPC_UPGRADE_SERVICE); + if (call->total_tx_size) { + debug("total Tx size %llu\n", call->total_tx_size); + RXRPC_ADD_TX_LENGTH(control, ctrllen, call->total_tx_size); + } + + msg.msg_name = &call->peer; + msg.msg_namelen = call->peer_len; + msg.msg_iov = buf ? buf->msg_iov : NULL; + msg.msg_iovlen = buf ? buf->ioc : 0; + msg.msg_control = control; + msg.msg_controllen = ctrllen; + msg.msg_flags = 0; + + if (more) + flags |= MSG_MORE; + + switch (call->peer.transport.family) { + case AF_INET: + inet_ntop(call->peer.transport.family, &call->peer.transport.sin.sin_addr, + abuf, sizeof(abuf)); + debug("Tx %u Addr %s:%u\n", + call->peer.srx_service, abuf, ntohs(call->peer.transport.sin.sin_port)); + break; + case AF_INET6: + inet_ntop(call->peer.transport.family, &call->peer.transport.sin6.sin6_addr, + abuf, sizeof(abuf)); + debug("Tx %u Addr [%s]:%u\n", + call->peer.srx_service, abuf, ntohs(call->peer.transport.sin6.sin6_port)); + break; + } + + if (rxrpc_debug_transport) { + dump_cmsg("Tx", &msg); + for (i = 0; i < msg.msg_iovlen; i++) + debug("Tx IOV[%02u] %04zu %p\n", + i, msg.msg_iov[i].iov_len, msg.msg_iov[i].iov_base); + } + if (rxrpc_debug_data) + dump_enc_buffers(buf); + + olen = sizeof(tmp); + getsockopt(call->endpoint->priv->fd, SOL_SOCKET, SO_ERROR, (char *)&tmp, &olen); + + /* Send the data */ + ret = sendmsg(call->endpoint->priv->fd, &msg, flags); + if (ret < 0) { + debug("SENDMSG: %m\n"); + return rxrpc_syserror(_result, "sendmsg"); + } + + debug("SENDMSG: %d%s\n", ret, more ? " [more]" : ""); + call->priv->known_to_kernel = true; + return ret >= 0; +} + +/* + * Process the metadata given to us by recvmsg(). + */ +static bool af_rxrpc_process_anciliary(struct msghdr *msg, + struct rxrpc_call **_call, + struct rxrpc_result *result) +{ + struct rxrpc_call *call = NULL; + struct cmsghdr *cmsg; + unsigned long tmp; + enum rxrpc_recv_condition cond; + + if (rxrpc_debug_transport) + dump_cmsg("Rx", msg); + + if (msg->msg_flags & MSG_MORE) + cond = rxrpc_recv_more; + else if (msg->msg_flags & MSG_EOR) + cond = rxrpc_recv_complete; + else + cond = rxrpc_recv_complete_now_send; + + for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { + unsigned char *p = CMSG_DATA(cmsg); + int n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg)); + + if (cmsg->cmsg_level == SOL_SOCKET) { + switch (cmsg->cmsg_type) { + case SO_TIMESTAMPNS_NEW: + if (call->have_rx_timestamp) + continue; + + if (n != sizeof(call->rx_timestamp)) { + fprintf(stderr, "SO_TIMESTAMPNS wrong size\n"); + abort(); + } + + memcpy(&call->rx_timestamp, p, n); + call->have_rx_timestamp = true; + debug("Rx tstamp\n"); + break; + } + } + + if (cmsg->cmsg_level != SOL_RXRPC) + continue; + + switch (cmsg->cmsg_type) { + case RXRPC_USER_CALL_ID: + if (n != sizeof(tmp)) { + fprintf(stderr, "User call ID wrong size\n"); + abort(); + } + + memcpy(&tmp, p, sizeof(tmp)); + call = (struct rxrpc_call *)tmp; + *_call = call; + break; + + case RXRPC_ABORT: + if (n == 4) + memcpy(&call->abort_code, p, 4); + call->error = ECONNABORTED; + result->source = rxrpc_error_remote_abort; + result->error = ECONNABORTED; + result->abort_code = call->abort_code; + cond = rxrpc_recv_abort; + break; + + case RXRPC_NET_ERROR: + if (n != sizeof(call->error)) + goto parse_error; + memcpy(&call->error, p, 4); + result->source = rxrpc_error_from_network; + result->error = call->error; + result->abort_code = 0; + cond = rxrpc_recv_net_error; + break; + + case RXRPC_LOCAL_ERROR: + if (n != sizeof(call->error)) + goto parse_error; + memcpy(&call->error, p, 4); + result->source = rxrpc_error_from_system; + result->error = call->error; + result->abort_code = 0; + cond = rxrpc_recv_local_error; + break; + + case RXRPC_BUSY: + result->source = rxrpc_error_peer_busy; + result->error = ECONNREFUSED; + result->abort_code = 0; + cond = rxrpc_recv_busy; + break; + + case RXRPC_ACK: + cond = rxrpc_recv_final_ack; + break; + + default: + fprintf(stderr, "Unexpected CMSG type %x\n", cmsg->cmsg_type); + break; + } + } + + if (msg->msg_flags & MSG_EOR) { + debug("Rx EOR\n"); + call->priv->known_to_kernel = false; + } + + /* Adjust the call's state. */ + switch (cond) { + case rxrpc_recv_more: + break; + case rxrpc_recv_complete_now_send: + break; + case rxrpc_recv_final_ack: + if (call->state != rx_call_sv_waiting_for_final_ack) { + fprintf(stderr, "RxRPC: Recv-Ack in bad call state (%d)\n", + call->state); + abort(); + } + call->state = rx_call_completed; + call->completed = rx_call_success; + break; + case rxrpc_recv_complete: + call->state = rx_call_completed; + call->completed = rx_call_success; + break; + case rxrpc_recv_abort: + call->state = rx_call_completed; + call->completed = rx_call_remotely_aborted; + break; + case rxrpc_recv_net_error: + call->state = rx_call_completed; + call->completed = rx_call_net_error; + break; + case rxrpc_recv_local_error: + call->state = rx_call_completed; + call->completed = rx_call_local_error; + break; + case rxrpc_recv_busy: + call->error = ECONNREFUSED; + call->state = rx_call_completed; + call->completed = rx_call_rejected_busy; + break; + default: + abort(); + } + + if (call->state == rx_call_completed && call->completion_func) { + call->completion_func(call); + call->completion_func = NULL; + } + + debug("<--af_rxrpc_recv_data() = t [c=%u]\n", cond); + return true; + +parse_error: + fprintf(stderr, "cmsg parse error %u\n", cmsg->cmsg_type); + abort(); +} + +/* + * Process a message. + */ +static void af_rxrpc_recv_msg(struct rxrpc_local_endpoint *endpoint, + struct msghdr *msg, unsigned int len, + struct rxrpc_result *result) +{ + struct rxrpc_transport_private *trans = endpoint->priv; + struct rxrpc_call *call = NULL; + struct iovec *iov = msg->msg_iov; + struct sockaddr_rxrpc *srx = msg->msg_name; + unsigned int tmp; + int i; + + if (!af_rxrpc_process_anciliary(msg, &call, result)) + return; + + if (call->service_id != srx->srx_service) + debug("Upgraded service to %u\n", srx->srx_service); + call->service_id = srx->srx_service; /* Changes if service upgrade */ + result->service_id = srx->srx_service; + + if (len > CONN_NR_IOV * RXGEN_BUFFER_SIZE) { + fprintf(stderr, "Too much data returned\n"); + abort(); + } + + if (rxrpc_debug_data) { + tmp = len; + for (i = 0; i < CONN_NR_IOV && tmp > 0; i++) { + unsigned int seg = (tmp <= iov[i].iov_len) ? tmp : iov[i].iov_len; + bool nl = false; + printf("Rx IOV[%u] %zu %u %p\n", i, iov[i].iov_len, seg, iov[i].iov_base); + + unsigned int j, s = seg; // <= 32 ? seg : 32; + for (j = 0; j < s; j++) { + if (!(j & 3)) { + if (!(j & 31)) + printf("%06x: ", j); + else + printf(" "); + } + printf("%02x", ((unsigned char *)iov[i].iov_base)[j]); + nl = false; + if ((j & 31) == 31) { + printf("\n"); + nl = true; + } + } + if (!nl) + printf("\n"); + tmp -= seg; + } + } + + /* Update the buffers with the amount of data stored. */ + for (i = 0; i < CONN_NR_IOV && len > 0; i++) { + struct rxrpc_rx_buf *b; + unsigned int n; + + b = list_entry(trans->rx_spare_buffers.next, struct rxrpc_rx_buf, link); + b->index = call->dec_index++; + + n = RXGEN_BUFFER_SIZE; + if (n > len) + n = len; + debug("BUF[%u] +%-4u %zu {%u}\n", + b->index, n, call->dec_amount + n, trans->rx_nr_buffers); + b->len = n; + b->buf = iov[i].iov_base; + iov[i].iov_base = NULL; + + trans->rx_nr_buffers--; + list_move_tail(&b->link, &call->dec_buffers); + call->dec_amount += n; + len -= n; + } + + call->dec_func(call); +} + +/* + * Receive data from a variety of calls in one go. + */ +static bool af_rxrpc_recv(struct rxrpc_local_endpoint *endpoint, bool nowait, + struct rxrpc_result *result) +{ + struct rxrpc_transport_private *trans = endpoint->priv; + struct iovec *iov = trans->rx_iov; + int count, i, j; + + if (rxrpc_debug_transport) + printf("\n"); + + /* Tamp down the buffers to make sure we cycle through the memory */ + j = 0; + for (i = 0; i < CONN_MSGS * CONN_NR_IOV; i++) { + if (!iov[i].iov_base) + continue; + iov[j].iov_base = iov[i].iov_base; + } + + for (; j < CONN_MSGS * CONN_NR_IOV; j++) { + iov[j].iov_base = malloc(RXGEN_BUFFER_SIZE); + if (!iov[j].iov_base) + return rxrpc_nomem(result); + } + + /* Make sure we have sufficient buffer descriptors */ + while (trans->rx_nr_buffers < CONN_MSGS * CONN_NR_IOV) { + struct rxrpc_rx_buf *b = calloc(1, sizeof(struct rxrpc_rx_buf)); + if (!b) + return rxrpc_nomem(result); + list_add(&b->link, &trans->rx_spare_buffers); + trans->rx_nr_buffers++; + } + + /* Receive messages. The descriptor array is already set up and gets + * reset as messages are processed. + */ + count = recvmmsg(trans->fd, trans->rx_msgs, CONN_MSGS, + nowait ? MSG_DONTWAIT : MSG_WAITFORONE, + NULL); + debug("RECVMMSG: %d\n", count); + if (count == -1) + return false; + + for (i = 0; i < count; i++) { + struct mmsghdr *m = &trans->rx_msgs[i]; + struct msghdr *msg = &m->msg_hdr; + + debug("Rx DATA: %d [fl:%x]\n", m->msg_len, msg->msg_flags); + + af_rxrpc_recv_msg(endpoint, msg, m->msg_len, result); + + /* Reset the message descriptor. */ + m->msg_hdr.msg_namelen = sizeof(trans->rx_peer[i]); + m->msg_hdr.msg_controllen = CONN_CMSG_SIZE; + m->msg_hdr.msg_flags = 0; + } + + return true; +} + +/* + * Abort a call. + */ +static void af_rxrpc_abort_call(struct rxrpc_call *call, uint32_t abort_code) +{ + struct msghdr msg; + size_t ctrllen; + unsigned char control[128]; + + if (call->priv->known_to_kernel) { + memset(control, 0, sizeof(control)); + ctrllen = 0; + RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)call); + RXRPC_ADD_ABORT(control, ctrllen, abort_code); + + msg.msg_name = &call->peer; + msg.msg_namelen = call->peer_len; + msg.msg_iov = NULL; + msg.msg_iovlen = 0; + msg.msg_control = control; + msg.msg_controllen = ctrllen; + msg.msg_flags = 0; + + sendmsg(call->endpoint->priv->fd, &msg, 0); + call->state = rx_call_completed; + call->completed = rx_call_locally_aborted; + } +} + +/* + * Terminate a call. + */ +static void af_rxrpc_end_call(struct rxrpc_call *call) +{ + if (call->priv->known_to_kernel) { + fprintf(stderr, "Call still known to kernel\n"); + abort(); + } + free(call->priv); +} + +struct rxrpc_transport af_rxrpc_transport = { + .id = rxrpc_transport_id, + .name = "AF_RXRPC", + .create_local_endpoint = af_rxrpc_create_local_endpoint, + .free_local_endpoint = af_rxrpc_free_local_endpoint, + .new_security = af_rxrpc_new_security, + .free_security = af_rxrpc_free_security, + .make_call = af_rxrpc_make_call, +#if 0 + .new_service = NULL, + .shutdown_service = NULL, + .accept_call = NULL, +#endif + .abort_call = af_rxrpc_abort_call, + .end_call = af_rxrpc_end_call, + .send_data = af_rxrpc_send_data, + .recv = af_rxrpc_recv, +}; diff --git a/lib/list.h b/lib/list.h new file mode 100644 index 0000000..e4882ce --- /dev/null +++ b/lib/list.h @@ -0,0 +1,286 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +/* + * Simple doubly linked list implementation. + * + * Some of the internal functions ("__xxx") are useful when + * manipulating whole lists rather than single entries, as + * sometimes we already know the next/prev entries and we can + * generate better code by using them directly rather than + * using the generic single-entry routines. + */ + +#ifndef LIST_H +#define LIST_H + +struct list_head { + struct list_head *next, *prev; +}; + +static inline void init_list_head(struct list_head *list) +{ + list->next = list; + list->prev = list; +} + +/** + * list_empty - tests whether a list is empty + * @head: the list to test. + */ +static inline int list_empty(const struct list_head *head) +{ + return head->next == head; +} + +/* + * Insert a new entry between two known consecutive entries. + * + * This is only for internal list manipulation where we know + * the prev/next entries already! + */ +static inline void __list_add(struct list_head *new, + struct list_head *prev, + struct list_head *next) +{ + next->prev = new; + new->next = next; + new->prev = prev; + prev->next = new; +} + +/** + * list_add - add a new entry + * @new: new entry to be added + * @head: list head to add it after + * + * Insert a new entry after the specified head. + * This is good for implementing stacks. + */ +static inline void list_add(struct list_head *new, struct list_head *head) +{ + __list_add(new, head, head->next); +} + +/** + * list_add_tail - add a new entry + * @new: new entry to be added + * @head: list head to add it before + * + * Insert a new entry before the specified head. + * This is useful for implementing queues. + */ +static inline void list_add_tail(struct list_head *new, struct list_head *head) +{ + __list_add(new, head->prev, head); +} + +/* + * Delete a list entry by making the prev/next entries + * point to each other. + * + * This is only for internal list manipulation where we know + * the prev/next entries already! + */ +static inline void __list_del(struct list_head * prev, struct list_head * next) +{ + next->prev = prev; + prev->next = next; +} + +/** + * list_del - deletes entry from list. + * @entry: the element to delete from the list. + * Note: list_empty() on entry does not return true after this, the entry is + * in an undefined state. + */ +static inline void __list_del_entry(struct list_head *entry) +{ + __list_del(entry->prev, entry->next); +} + +static inline void list_del(struct list_head *entry) +{ + __list_del_entry(entry); + entry->next = (void *)0xa1; + entry->prev = (void *)0xa3; +} + +/** + * list_del_init - deletes entry from list and reinitialize it. + * @entry: the element to delete from the list. + */ +static inline void list_del_init(struct list_head *entry) +{ + __list_del_entry(entry); + init_list_head(entry); +} + +/** + * list_replace - replace old entry by new one + * @old : the element to be replaced + * @new : the new element to insert + * + * If @old was empty, it will be overwritten. + */ +static inline void list_replace(struct list_head *old, + struct list_head *new) +{ + new->next = old->next; + new->next->prev = new; + new->prev = old->prev; + new->prev->next = new; +} + +static inline void list_replace_init(struct list_head *old, + struct list_head *new) +{ + list_replace(old, new); + init_list_head(old); +} + +/** + * list_move - delete from one list and add as another's head + * @list: the entry to move + * @head: the head that will precede our entry + */ +static inline void list_move(struct list_head *list, struct list_head *head) +{ + __list_del_entry(list); + list_add(list, head); +} + +/** + * list_move_tail - delete from one list and add as another's tail + * @list: the entry to move + * @head: the head that will follow our entry + */ +static inline void list_move_tail(struct list_head *list, + struct list_head *head) +{ + __list_del_entry(list); + list_add_tail(list, head); +} + +static inline void __list_splice(const struct list_head *list, + struct list_head *prev, + struct list_head *next) +{ + struct list_head *first = list->next; + struct list_head *last = list->prev; + + first->prev = prev; + prev->next = first; + + last->next = next; + next->prev = last; +} + +/** + * list_splice - join two lists, this is designed for stacks + * @list: the new list to add. + * @head: the place to add it in the first list. + */ +static inline void list_splice(const struct list_head *list, + struct list_head *head) +{ + if (!list_empty(list)) + __list_splice(list, head, head->next); +} + +/** + * list_splice_tail - join two lists, each list being a queue + * @list: the new list to add. + * @head: the place to add it in the first list. + */ +static inline void list_splice_tail(struct list_head *list, + struct list_head *head) +{ + if (!list_empty(list)) + __list_splice(list, head->prev, head); +} + +/** + * list_splice_init - join two lists and reinitialise the emptied list. + * @list: the new list to add. + * @head: the place to add it in the first list. + * + * The list at @list is reinitialised + */ +static inline void list_splice_init(struct list_head *list, + struct list_head *head) +{ + if (!list_empty(list)) { + __list_splice(list, head, head->next); + init_list_head(list); + } +} + +/** + * list_splice_tail_init - join two lists and reinitialise the emptied list + * @list: the new list to add. + * @head: the place to add it in the first list. + * + * Each of the lists is a queue. + * The list at @list is reinitialised + */ +static inline void list_splice_tail_init(struct list_head *list, + struct list_head *head) +{ + if (!list_empty(list)) { + __list_splice(list, head->prev, head); + init_list_head(list); + } +} + +#define offsetof(TYPE, MEMBER) ((size_t)&((TYPE *)0)->MEMBER) + +/** + * container_of - cast a member of a structure out to the containing structure + * @ptr: the pointer to the member. + * @type: the type of the container struct this is embedded in. + * @member: the name of the member within the struct. + * + */ +#define container_of(ptr, type, member) ({ \ + void *__mptr = (void *)(ptr); \ + ((type *)(__mptr - offsetof(type, member))); }) + +/** + * list_entry - get the struct for this entry + * @ptr: the &struct list_head pointer. + * @type: the type of the struct this is embedded in. + * @member: the name of the list_head within the struct. + */ +#define list_entry(ptr, type, member) \ + container_of(ptr, type, member) + +/** + * list_first_entry - get the first element from a list + * @ptr: the list head to take the element from. + * @type: the type of the struct this is embedded in. + * @member: the name of the list_head within the struct. + * + * Note, that list is expected to be not empty. + */ +#define list_first_entry(ptr, type, member) \ + list_entry((ptr)->next, type, member) + +/** + * list_next_entry - get the next element in list + * @pos: the type * to cursor + * @member: the name of the list_head within the struct. + */ +#define list_next_entry(pos, member) \ + list_entry((pos)->member.next, typeof(*(pos)), member) + +/** + * list_for_each_entry - iterate over list of given type + * @pos: the type * to use as a loop cursor. + * @head: the head for your list. + * @member: the name of the list_head within the struct. + */ +#define list_for_each_entry(pos, head, member) \ + for (pos = list_first_entry(head, typeof(*pos), member); \ + &pos->member != (head); \ + pos = list_next_entry(pos, member)) + +#endif /* LIST_H */ diff --git a/lib/rxrpc.h b/lib/rxrpc.h index ad7ced8..67b6d6c 100644 --- a/lib/rxrpc.h +++ b/lib/rxrpc.h @@ -15,31 +15,108 @@ #include #include #include +#include #include #include #include #include #include +#include "list.h" + +extern bool rxrpc_debug_buffers; +extern bool rxrpc_debug_core; +extern bool rxrpc_debug_data; +extern bool rxrpc_debug_ops; +extern bool rxrpc_debug_transport; +extern bool rxrpc_debug_xdr; typedef unsigned int net_xdr_t; struct xdr_u64 { net_xdr_t hi, lo; }; +enum rxrpc_security_auth_level { + rxrpc_security_unset, + rxrpc_security_no_auth, + rxrpc_security_local_auth, + rxrpc_security_clear, + rxrpc_security_integrity_only, + rxrpc_security_encrypt, +}; + +enum rxrpc_error_source { + rxrpc_error_none, + rxrpc_error_remote_abort, + rxrpc_error_peer_busy, + rxrpc_error_from_system, + rxrpc_error_from_network, + rxrpc_error_from_parameters, +}; + struct rxrpc_result { + enum rxrpc_error_source source; + int error; + int abort_code; + unsigned short service_id; /* Service actually used (may be upgraded) */ + const char *op_name; +}; + +/* + * Local transport endpoint. + */ +struct rxrpc_local_endpoint { + struct rxrpc_transport_private *priv; + struct sockaddr_rxrpc addr; + socklen_t addr_len; +}; + +/* + * Security details to use. + */ +struct rxrpc_security { + struct rxrpc_security_private *priv; + enum rxrpc_security_auth_level level; + unsigned int key_name_len; + char key_name[]; /* Name of key to use */ }; /* * Call parameters. */ struct rxrpc_call_params { + struct rxrpc_local_endpoint *endpoint; + struct rxrpc_security *security; + struct sockaddr_rxrpc peer; + socklen_t peer_len; bool exclusive; bool upgrade_service; }; enum rx_call_state { + rx_call_cl_not_started, + rx_call_cl_encoding_params, + rx_call_cl_receiving_response, + + rx_call_sv_not_started, + rx_call_sv_receiving_params, + rx_call_sv_wait_for_no_MSG_MORE, + rx_call_sv_processing, + rx_call_sv_encoding_response_split, + rx_call_sv_encoding_response, + rx_call_sv_response_encoded, + rx_call_sv_waiting_for_final_ack, rx_call_completed, }; +enum rx_call_completion { + rx_call_incomplete, + rx_call_success, + rx_call_remotely_aborted, + rx_call_locally_aborted, + rx_call_net_error, + rx_call_local_error, + rx_call_rejected_busy, +}; + struct rxrpc_resv { unsigned int max_ioc; /* Size of msg_iov[] */ size_t alloc; /* Size of buffer allocated */ @@ -62,21 +139,59 @@ struct rxrpc_buffer { * RxRPC call. */ struct rxrpc_call { + struct rxrpc_call_private *priv; struct rxrpc_local_endpoint *endpoint; + struct sockaddr_rxrpc peer; + socklen_t peer_len; + unsigned short service_id; + const char *op_name; /* Name of the RPC operation */ enum rx_call_state state; + enum rx_call_completion completed; + uint32_t abort_code; + unsigned int error; + bool client_side; /* True if client side of call */ bool split; /* True if split-mode call */ + bool exclusive; + bool upgrade_service; + bool have_rx_timestamp; + + struct timespec rx_timestamp; /* Timestamp of first received packet */ + + void (*completion_func)(struct rxrpc_call *); /* Transmission phase state */ + unsigned long long total_tx_size; /* Total expected transmission size */ uint32_t encode_error; /* Receive phase state */ + bool rx_ended; /* True if all data read */ bool dec_oom; /* True if we got ENOMEM during decode */ - uint32_t decode_error; + uint32_t decode_error; /* Abort code if there's a decode error. */ + unsigned int dec_index; + struct list_head dec_buffers; /* List of received buffers */ + size_t dec_amount; /* Amount of data in dec_buffers */ + void (*dec_func)(struct rxrpc_call *); }; -extern bool rxrpc_receive(struct rxrpc_local_endpoint *endpoint, bool nowait, - struct rxrpc_result *result); +#define RXGEN_BUFFER_SIZE 1024 + +struct rxrpc_rx_buf { + struct list_head link; /* Link in buffer queue */ + uint8_t *buf; + unsigned short len; /* Amount of data in buffer */ + unsigned short used; /* How much data used */ + unsigned int index; /* Index in buffer list */ +}; + +extern struct rxrpc_local_endpoint *rxrpc_new_local_endpoint(const struct sockaddr_rxrpc *local_sa, + socklen_t local_salen); +extern void rxrpc_free_local_endpoint(struct rxrpc_local_endpoint *endpoint); +extern struct rxrpc_security *rxrpc_new_security(struct rxrpc_local_endpoint *endpoint, + const char *key_name, + enum rxrpc_security_auth_level level, + struct rxrpc_result *_result); +extern void rxrpc_free_security(struct rxrpc_security *security); extern struct rxrpc_buffer *rxrpc_alloc_enc_buffer(struct rxrpc_resv *resv); extern void rxrpc_free_buffer(struct rxrpc_buffer *buf); @@ -84,6 +199,8 @@ extern struct rxrpc_call *rxrpc_make_call(struct rxrpc_call_params *z_params, struct rxrpc_buffer *z_request, const char *op_name, struct rxrpc_result *_result); +extern bool rxrpc_receive(struct rxrpc_local_endpoint *endpoint, bool nowait, + struct rxrpc_result *result); extern void rxrpc_receive_reply(struct rxrpc_call *call); extern bool rxrpc_request_received(struct rxrpc_call *call); extern bool rxrpc_read(struct rxrpc_call *z_call, void *buffer, size_t buffer_size); @@ -91,7 +208,7 @@ extern bool rxrpc_send_data(struct rxrpc_call *call, struct rxrpc_buffer *buf, b struct rxrpc_result *_result); extern void rxrpc_abort_call(struct rxrpc_call *call, uint32_t abort_code); extern bool rxrpc_end_call(struct rxrpc_call *call, uint32_t abort_code); -extern bool rxrpc_call_failed(struct rxrpc_call *call); +extern bool rxrpc_call_failed(struct rxrpc_result *result); /* * XDR encoding size calculation routines @@ -212,4 +329,24 @@ static inline uint64_t rxrpc_dec_u64(struct rxrpc_call *call) return x; } +/* + * Error handling routines + */ +static inline __attribute__((format(printf,2,3))) +bool rxrpc_syserror(struct rxrpc_result *result, const char *fmt, ...) +{ + result->source = rxrpc_error_from_system; + result->error = errno; + result->abort_code = 0; + return false; +} + +static inline bool rxrpc_nomem(struct rxrpc_result *result) +{ + result->source = rxrpc_error_from_system; + result->error = ENOMEM; + result->abort_code = 0; + return false; +} + #endif /* RXRPC_H */ diff --git a/lib/rxrpc_core.c b/lib/rxrpc_core.c new file mode 100644 index 0000000..de2818f --- /dev/null +++ b/lib/rxrpc_core.c @@ -0,0 +1,326 @@ +/* rxrpc core + * + * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public Licence + * as published by the Free Software Foundation; either version + * 2 of the Licence, or (at your option) any later version. + */ + +#include +#include +#include +#include +#include "rxrpc.h" +#include "transport.h" + +bool rxrpc_debug_buffers; /* Debug buffer handling */ +bool rxrpc_debug_core; /* Debug core management */ +bool rxrpc_debug_data; /* Dump data */ +bool rxrpc_debug_ops; /* Debug operations */ +bool rxrpc_debug_transport; /* Debug transport */ +bool rxrpc_debug_xdr; /* Debug XDR encoding/decoding */ + +#define debug(fmt, ...) \ + do { if (rxrpc_debug_core) printf("CORE: "fmt, ## __VA_ARGS__); } while (0) +#define debug_op(fmt, ...) \ + do { if (rxrpc_debug_ops) printf("OP: "fmt, ## __VA_ARGS__); } while (0) + +static struct rxrpc_transport *rxrpc_transport = &af_rxrpc_transport; + +/* + * Set up a new transport + */ +struct rxrpc_local_endpoint *rxrpc_new_local_endpoint(const struct sockaddr_rxrpc *local_addr, + socklen_t local_len) +{ + struct rxrpc_local_endpoint *endpoint; + + if (local_len > sizeof(endpoint->addr)) { + errno = EINVAL; + return NULL; + } + + if (!rxrpc_transport) { + fprintf(stderr, "No Rx transport loaded\n"); + errno = EINVAL; + return NULL; + } + + endpoint = calloc(1, sizeof(*endpoint)); + if (!endpoint) + return NULL; + + if (local_len > 0) + memcpy(&endpoint->addr, local_addr, local_len); + + if (!rxrpc_transport->create_local_endpoint(endpoint)) { + free(endpoint); + return NULL; + } + + return endpoint; +} + +/* + * Destroy a transport + */ +void rxrpc_free_local_endpoint(struct rxrpc_local_endpoint *endpoint) +{ + rxrpc_transport->free_local_endpoint(endpoint); + free(endpoint); +} + +/* + * Set up a new security context. + */ +struct rxrpc_security *rxrpc_new_security(struct rxrpc_local_endpoint *endpoint, + const char *key_name, + enum rxrpc_security_auth_level level, + struct rxrpc_result *_result) +{ + struct rxrpc_security *security; + size_t len = 0; + + if (level < rxrpc_security_unset || + level > rxrpc_security_encrypt) { + _result->source = rxrpc_error_from_parameters; + _result->error = EINVAL; + return NULL; + } + + if (key_name) + len = strlen(key_name) + 1; + + security = calloc(1, sizeof(*security) + len); + if (!security) { + rxrpc_nomem(_result); + return NULL; + } + + security->level = level; + if (key_name) { + security->key_name_len = len - 1; + memcpy(security->key_name, key_name, len); + } + + if (!rxrpc_transport->new_security(endpoint, security, _result)) { + free(security); + return NULL; + } + + return security; +} + +/* + * Destroy a security context. + */ +void rxrpc_free_security(struct rxrpc_security *security) +{ + if (security) { + rxrpc_transport->free_security(security); + free(security); + } +} + +/* + * Set up a client call and prepare for encoding. + */ +struct rxrpc_call *rxrpc_make_call(struct rxrpc_call_params *params, + struct rxrpc_buffer *buf, + const char *op_name, + struct rxrpc_result *_result) +{ + struct rxrpc_call *call; + + _result->op_name = op_name; + + if (params->peer_len < 5 * 2 || + params->peer_len > sizeof(call->peer)) { + errno = EDESTADDRREQ; + rxrpc_syserror(_result, "Peer address too small"); + return NULL; + } + + if (params->peer.srx_family != AF_RXRPC) { + errno = EPROTOTYPE; + rxrpc_syserror(_result, "Peer address family unsupported"); + return NULL; + } + + call = calloc(1, sizeof(*call)); + if (!call) { + rxrpc_nomem(_result); + return NULL; + } + + call->endpoint = params->endpoint; + call->service_id = params->peer.srx_service; + call->state = rx_call_cl_not_started; + call->client_side = true; + call->total_tx_size = buf->size; + call->exclusive = params->exclusive; + call->upgrade_service = params->upgrade_service; + call->op_name = op_name; + call->peer_len = params->peer_len; + memcpy(&call->peer, ¶ms->peer, params->peer_len); + + call->dec_func = rxrpc_receive_reply; + + init_list_head(&call->dec_buffers); + + debug_op("Send %s\n", op_name); + if (!rxrpc_transport->make_call(call, _result)) + goto error; + + return call; + +error: + free(call); + return NULL; +} + +/* + * Abort a call. + */ +void rxrpc_abort_call(struct rxrpc_call *call, uint32_t abort_code) +{ + rxrpc_transport->abort_call(call, abort_code); + call->state = rx_call_completed; + call->completed = rx_call_locally_aborted; +} + +/* + * End a call, aborting it if necessary. + */ +bool rxrpc_end_call(struct rxrpc_call *call, uint32_t abort_code) +{ + enum rx_call_completion completed; + struct rxrpc_rx_buf *b; + + if (call->state != rx_call_completed) + rxrpc_abort_call(call, abort_code); + rxrpc_transport->end_call(call); + + while (!list_empty(&call->dec_buffers)) { + b = list_entry(call->dec_buffers.next, struct rxrpc_rx_buf, link); + list_del(&b->link); + free(b->buf); + free(b); + } + + completed = call->completed; + free(call); + return completed == rx_call_success; +} + +/* + * Send buffered data. + */ +bool rxrpc_send_data(struct rxrpc_call *call, struct rxrpc_buffer *buf, bool more, + struct rxrpc_result *_result) +{ + debug("-->rxrpc_send_data(%u,%u)\n", call->state, more); + + /* Switch into encode state */ + switch (call->state) { + case rx_call_cl_not_started: + case rx_call_sv_processing: + call->state++; + break; + case rx_call_cl_encoding_params: + case rx_call_sv_encoding_response: + break; + default: + fprintf(stderr, "RxRPC: Send in bad call state (%d)\n", call->state); + abort(); + } + + if (!rxrpc_transport->send_data(call, buf, more, _result)) + return false; + + if (!more) { + call->state++; + if (call->state == rx_call_cl_encoding_params || + call->state == rx_call_sv_encoding_response) + call->state++; + } + + return true; +} + +/* + * Reply decoder. + */ +void rxrpc_receive_reply(struct rxrpc_call *call) +{ + if (call->state == rx_call_completed && + call->completed == rx_call_success) + debug("Reply received\n"); +} + +/* + * End of split notification. + */ +void rxrpc_end_split(struct rxrpc_call *call, + struct rxrpc_result *_result) +{ + switch (call->state) { + case rx_call_cl_encoding_params: + rxrpc_transport->send_data(call, NULL, false, _result); + break; + + case rx_call_cl_receiving_response: + call->dec_func = rxrpc_receive_reply; + call->dec_func(call); + break; + + default: + fprintf(stderr, "rxrpc_end_split() called in state %u\n", + call->state); + return; + } +} + +/* + * End of reception of request. There should be no more data in the buffers. + */ +bool rxrpc_request_received(struct rxrpc_call *call) +{ + switch (call->state) { + case rx_call_sv_receiving_params: + + default: + fprintf(stderr, "rxrpc_request_received() called in state %u\n", + call->state); + return false; + } +} + +/* + * Receive data from the local endpoint. + */ +bool rxrpc_receive(struct rxrpc_local_endpoint *endpoint, bool nowait, + struct rxrpc_result *result) +{ + return rxrpc_transport->recv(endpoint, nowait, result); +} + +bool rxrpc_call_failed(struct rxrpc_result *result) +{ + char buf[128]; + + if (result->source == rxrpc_error_remote_abort) { + fprintf(stderr, "OP: %s aborted: %d\n", + result->op_name, result->abort_code); + } else { + buf[0] = 0; + strerror_r(result->error, buf, sizeof(buf)); + fprintf(stderr, "OP: %s failed: %s\n", + result->op_name, buf); + } + + return false; +} diff --git a/lib/rxrpc_xdr.c b/lib/rxrpc_xdr.c new file mode 100644 index 0000000..ed504d1 --- /dev/null +++ b/lib/rxrpc_xdr.c @@ -0,0 +1,256 @@ +/* XDR coding and decoding. + * + * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public Licence + * as published by the Free Software Foundation; either version + * 2 of the Licence, or (at your option) any later version. + */ + +#include +#include +#include +#include "rxrpc.h" + +#define debug(fmt, ...) \ + do { if (rxrpc_debug_xdr) printf("XDR: "fmt, ## __VA_ARGS__); } while (0) +#define debug_buffer(fmt, ...) \ + do { if (rxrpc_debug_buffers) printf("BUF: "fmt, ## __VA_ARGS__); } while (0) + +#define UAENOMEM 0x2f6df0b + +static inline size_t rxrpc_calc_padding(size_t x) +{ + return (4 - (x & 3)) & 3; +} + +/* + * Set up an encoded buffer. + */ +struct rxrpc_buffer *rxrpc_alloc_enc_buffer(struct rxrpc_resv *resv) +{ + struct rxrpc_buffer *buf; + size_t alloc; + + buf = calloc(1, sizeof(struct rxrpc_buffer)); + if (!buf) + return NULL; + + if (resv->max_ioc == 0) + abort(); + buf->msg_iov = calloc(resv->max_ioc, sizeof(struct iovec)); + if (!buf->msg_iov) + goto error; + + alloc = (resv->alloc + 3) / 4; + buf->enc_buf = calloc(alloc, sizeof(net_xdr_t)); + if (!buf->enc_buf) + goto error; + + buf->size = resv->alloc + resv->additional; + buf->enc_buf_p = buf->enc_buf; + buf->enc_buf_end = buf->enc_buf + alloc; + buf->msg_iov[0].iov_base = buf->enc_buf_p; + debug_buffer("Tx ALLOC s=%zx a=%zx ioc=%u\n", buf->size, resv->alloc, resv->max_ioc); + return buf; + +error: + rxrpc_free_buffer(buf); + return NULL; +} + +void rxrpc_free_buffer(struct rxrpc_buffer *buf) +{ + if (buf) { + free(buf->msg_iov); + free(buf->enc_buf); + free(buf); + } +} + +/* + * Advance the iov array by two slots, leaving a slot for a blob. + */ +static void rxrpc_enc_advance_iov_by_2(struct rxrpc_buffer *buf) +{ + unsigned int ioc = buf->ioc; + + /* Note that iov_base may not be 4-bytes aligned if it starts with blob + * padding. + */ + buf->msg_iov[ioc].iov_len = + (void *)buf->enc_buf_p - buf->msg_iov[ioc].iov_base; + + ioc += 2; + buf->msg_iov[ioc].iov_base = buf->enc_buf_p; + buf->ioc = ioc; +} + +/* + * Encode an opaque blob to XDR. We copy in the blob if it's small, otherwise + * we assign it its own iov slot. + */ +void rxrpc_enc_opaque(struct rxrpc_buffer *buf, + const void *blob, unsigned int size, unsigned int max) +{ + unsigned int ioc; + size_t padsize = rxrpc_calc_padding(size); + size_t total = size + padsize; + + rxrpc_enc(buf, size); + + if (rxrpc_resv_opaque_inline(size, max)) { + if (size & 3) + buf->enc_buf_p[size >> 2] = 0; /* Clear padding */ + memcpy(buf->enc_buf_p, blob, size); + buf->enc_buf_p += total / sizeof(net_xdr_t); + } else { + rxrpc_enc_advance_iov_by_2(buf); + ioc = buf->ioc; + buf->msg_iov[ioc - 1].iov_len = size; + buf->msg_iov[ioc - 1].iov_base = (void *)blob; + if (padsize > 0) { + rxrpc_enc(buf, 0); + buf->msg_iov[ioc].iov_base += 4 - padsize; + buf->msg_iov[ioc].iov_len += padsize; + } + } +} + +static bool rxrpc_dec_abort(struct rxrpc_call *call, unsigned int abort_code) +{ + call->decode_error = abort_code; + rxrpc_abort_call(call, call->decode_error); + return false; +} + +static bool rxrpc_dec_underrun(struct rxrpc_call *call, size_t size) +{ + fprintf(stderr, "rxrpc XDR decoder underrun (%zx/%zx)\n", + call->dec_amount, size); + return rxrpc_dec_abort(call, call->client_side ? RXGEN_CC_UNMARSHAL : RXGEN_SS_UNMARSHAL); +} + +/* + * Read data from the receive buffers. + */ +bool rxrpc_read(struct rxrpc_call *call, void *buffer, size_t buffer_size) +{ + struct rxrpc_rx_buf *b; + unsigned int seg; + size_t offset = 0; + int i; + + debug("READ %zu/%zu\n", buffer_size, call->dec_amount); + + if (buffer_size > call->dec_amount) { + if (call->state == rx_call_completed && + call->completed != rx_call_success) + return false; + if (call->rx_ended) + return rxrpc_dec_underrun(call, buffer_size); + printf("Partial fail\n"); + return true; + } + + while (offset < buffer_size && !list_empty(&call->dec_buffers)) { + b = list_entry(call->dec_buffers.next, struct rxrpc_rx_buf, link); + + seg = b->len - b->used; + if (seg > buffer_size - offset) + seg = buffer_size - offset; + + debug("B[%u] %zu/%zu from %u/%zu\n", + b->index, offset, buffer_size, seg, call->dec_amount); + + memcpy(buffer + offset, b->buf + b->used, seg); + offset += seg; + b->used += seg; + call->dec_amount -= seg; + + if (b->used == b->len) { + list_del(&b->link); + free(b->buf); + free(b); + } else { + if (offset != buffer_size) + abort(); + } + } + + if (offset < buffer_size) { + fprintf(stderr, "Inconsistent amount of buffered rx data\n"); + abort(); + } + + if (list_empty(&call->dec_buffers) && call->dec_amount != 0) { + fprintf(stderr, "call->dec_amount should be zero\n"); + abort(); + } + + if (rxrpc_debug_xdr) { + seg = buffer_size <= 32 ? buffer_size : 32; + debug(""); + for (i = 0; i < seg; i++) + printf("%02x", ((unsigned char *)buffer)[i]); + printf("\n"); + } + + return true; +} + +/* + * Decode a 32-bit integer. + */ +uint32_t rxrpc_dec(struct rxrpc_call *call) +{ + uint32_t x; + + if (rxrpc_read(call, &x, sizeof(x))) + return ntohl(x); + return 0; +} + +/* + * Decode a 32-bit integer and check limit. + */ +uint32_t rxrpc_dec_limit(struct rxrpc_call *call, size_t limit) +{ + uint32_t x; + + x = rxrpc_dec(call); + if (x > limit && !call->decode_error) { + fprintf(stderr, "XDR: Decoded value exceeded limit\n"); + call->decode_error = call->client_side ? RXGEN_CC_UNMARSHAL : RXGEN_SS_UNMARSHAL; + rxrpc_abort_call(call, call->decode_error); + return 0; + } + + return x; +} + +/* + * Decode a blob. We allocate enough space to hold the padding and/or a + * NUL-termination. + */ +void *rxrpc_dec_blob(struct rxrpc_call *call, size_t len) +{ + unsigned char *p; + size_t padded_len = xdr_round_up(len); + + p = malloc(padded_len + (padded_len == len ? 1 : 0)); + if (!p) { + rxrpc_dec_abort(call, UAENOMEM); + return NULL; + } + + if (!rxrpc_read(call, p, padded_len)) { + free(p); + return NULL; + } + + p[len] = 0; + return p; +} diff --git a/lib/transport.h b/lib/transport.h new file mode 100644 index 0000000..897b89c --- /dev/null +++ b/lib/transport.h @@ -0,0 +1,97 @@ +/* Access points into the dynamically loaded Rx transport module + * + * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public Licence + * as published by the Free Software Foundation; either version + * 2 of the Licence, or (at your option) any later version. + */ + +#ifndef _RXRPC_TRANSPORT_H +#define _RXRPC_TRANSPORT_H + +#include +#include +#include +#include "rxrpc.h" + +enum rxrpc_recv_condition { + rxrpc_recv_more, + rxrpc_recv_final_ack, + rxrpc_recv_complete, + rxrpc_recv_complete_now_send, + rxrpc_recv_abort, + rxrpc_recv_net_error, + rxrpc_recv_local_error, + rxrpc_recv_busy, +}; + +#define rxrpc_transport_id "rxrpc-utils tran" + +struct rxrpc_transport_private; +struct rxrpc_security_private; +struct rxrpc_service_private; +struct rxrpc_call_private; + +#if 0 +typedef void (*rxrpc_incoming_call_func_t)(struct rxrpc_service *service, + void *caller_data, + const struct sockaddr_rxrpc *peer, + socklen_t *peerlen); +#endif + +struct rxrpc_transport { + char id[24]; /* = rxrpc_transport_id */ + char name[16]; /* Name of transport */ + + bool (*create_local_endpoint)(struct rxrpc_local_endpoint *endpoint); + + void (*free_local_endpoint)(struct rxrpc_local_endpoint *endpoint); + + bool (*new_security)(struct rxrpc_local_endpoint *endpoint, + struct rxrpc_security *security, + struct rxrpc_result *_result); + + void (*free_security)(struct rxrpc_security *security); + + bool (*make_call)(struct rxrpc_call *call, + struct rxrpc_result *_result); + +#if 0 + int (*new_service)(struct rxrpc_local_endpoint *endpoint, + struct rxrpc_service *service, + rxrpc_incoming_call_func_t new_call_func, + void *caller_data, + uint16_t local_port, + uint16_t local_service, + struct rxrpc_security_private **permits); + + void (*shutdown_service)(struct rxrpc_service_private *service); + + int (*accept_call)(struct rxrpc_call *call, + struct rxrpc_service *service, + void *caller_data, + struct sockaddr_rxrpc *_sa, + socklen_t *_salen); +#endif + + void (*abort_call)(struct rxrpc_call *call, + uint32_t abort_code); + + void (*end_call)(struct rxrpc_call *call); + + bool (*send_data)(struct rxrpc_call *call, + struct rxrpc_buffer *buf, + bool more, + struct rxrpc_result *_result); + + bool (*recv)(struct rxrpc_local_endpoint *endpoint, + bool nowait, + struct rxrpc_result *result); +}; + +extern struct rxrpc_transport af_rxrpc_transport; + +#endif /* _RXRPC_TRANSPORT_H */ -- 2.50.1