From 74dcb5b2d2924cda4a005f8d53960ad27a740d72 Mon Sep 17 00:00:00 2001 From: David Howells Date: Fri, 25 Sep 2015 23:21:50 +0100 Subject: [PATCH] Dynamically load the transport interface as a shared library Make it possible to change the transport interface by abstracting it out into a shared library and loading it dynamically by name. kafs/main.py currently loads it by specific name: rpc.load_kafs_transport("/usr/local/lib64/kafs_af_rxrpc.so") but this needs altering so that the name of the transport can be supplied by the user. File kafs/transport.h defines the interface as a table of function pointers defined as: struct kafs_transport kafs_transport = { .id = kafs_transport_id, .name = "" ... }; The ->new_service(), ->shutdown_service() and ->accept_call() methods aren't currently used. Each transport interface can define the following structures in any way it sees fit: struct kafs_transport_handle; struct kafs_security_handle; struct kafs_connection_handle; struct kafs_service_handle; struct kafs_call_handle; As far as kafs-utils is concerned they're opaque handles. Note that the interface is expected to invoke the data_ready_func function passed to ->make_call() when data becomes available. At that point, the data_ready_func function will call ->recv_data() to load data into the supplied buffers. ->poll_connection() is called to see if there's any data and potentially to wait for it if there isn't. The AF_RXRPC transport interface is now separated from core kafs-utils code and is currently not built or installed by the setup.py script. This will need fixing in some manner. Signed-off-by: David Howells --- Makefile | 2 +- af_rxrpc/Makefile | 2 + af_rxrpc/af_rxrpc.c | 675 +++++++++++++++++++++++++++ {kafs => af_rxrpc}/af_rxrpc.h | 0 kafs/argparse.py | 58 +-- kafs/commands/pts/setfields.py | 2 +- kafs/lib/cell.py | 15 +- kafs/lib/parse_setrestart_time.py | 2 +- kafs/main.py | 18 +- kafs/py_rxconn.c | 15 +- kafs/py_rxgen.c | 15 +- kafs/py_rxgen.h | 8 + kafs/py_rxsplit.c | 10 +- kafs/py_rxtrans.c | 154 +++++++ kafs/rxgen.h | 31 +- kafs/{af_rxrpc.c => transport.c} | 743 +++++++++++++----------------- kafs/transport.h | 124 +++++ rxgen/emit_c_sync_funcs.py | 2 +- rxgen/emit_py_module.py | 3 + rxgen/emit_py_sync_funcs.py | 2 +- setup.py | 3 +- 21 files changed, 1385 insertions(+), 499 deletions(-) create mode 100644 af_rxrpc/Makefile create mode 100644 af_rxrpc/af_rxrpc.c rename {kafs => af_rxrpc}/af_rxrpc.h (100%) create mode 100644 kafs/py_rxtrans.c rename kafs/{af_rxrpc.c => transport.c} (57%) create mode 100644 kafs/transport.h diff --git a/Makefile b/Makefile index dfbd32d..6674bc3 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -build: +run: python3 setup.py build rpm: diff --git a/af_rxrpc/Makefile b/af_rxrpc/Makefile new file mode 100644 index 0000000..be28d58 --- /dev/null +++ b/af_rxrpc/Makefile @@ -0,0 +1,2 @@ +kafs_af_rxrpc.so: af_rxrpc.c af_rxrpc.h Makefile ../kafs/transport.h + $(CC) -shared -fPIC -O2 -g -Wall -o $@ af_rxrpc.c -I../kafs diff --git a/af_rxrpc/af_rxrpc.c b/af_rxrpc/af_rxrpc.c new file mode 100644 index 0000000..faebfe7 --- /dev/null +++ b/af_rxrpc/af_rxrpc.c @@ -0,0 +1,675 @@ +/* AF_RXRPC driver + * + * Copyright (C) 2014 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public Licence + * as published by the Free Software Foundation; either version + * 2 of the Licence, or (at your option) any later version. + */ + +#define _XOPEN_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include "transport.h" +#include "af_rxrpc.h" + +#define RXGEN_CALL_MAGIC 0x52584745U +#define RXGEN_BUF_MAGIC (0x52420000U | __LINE__) +#define RXGEN_BUF_DEAD (0x6b6bU | __LINE__) + +#define debug(fmt, ...) do { if (0) printf("AFRX: "fmt, ## __VA_ARGS__); } while (0) + +uint32_t rxgen_dec_padding_sink; + +struct usage { + int count; +}; + +static inline void usage_set(struct usage *usage) +{ + usage->count = 1; +} + +static inline void usage_get(struct usage *usage) +{ + usage->count++; +} + +static inline int usage_put_and_test(struct usage *usage) +{ + return --usage->count == 0; +} + +struct kafs_transport_handle { + struct usage usage; + struct sockaddr_rxrpc local; +}; + +struct kafs_security_handle { + struct usage usage; + struct kafs_transport_handle *transport; + enum kafs_connection_auth_level level; + unsigned min_sec_level; + unsigned cell_name_len; + char cell_name[]; +}; + +struct kafs_connection_handle { + struct usage usage; + int fd; + struct kafs_transport_handle *transport; + struct kafs_security_handle *security; + struct sockaddr_rxrpc peer; +}; + +struct kafs_call_handle { + struct usage usage; + struct kafs_connection_handle *conn; + kafs_data_ready_func data_ready_func; + void *caller_data; + int known_to_kernel; +}; + +/* + * dump the control messages + */ +static __attribute__((unused)) +void dump_cmsg(struct msghdr *msg) +{ + struct cmsghdr *cmsg; + unsigned long user_id; + unsigned char *p; + int abort_code; + int n; + + for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { + n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg)); + p = CMSG_DATA(cmsg); + + printf("CMSG: %zu: ", cmsg->cmsg_len); + + if (cmsg->cmsg_level == SOL_RXRPC) { + switch (cmsg->cmsg_type) { + case RXRPC_USER_CALL_ID: + printf("RXRPC_USER_CALL_ID: "); + if (n != sizeof(user_id)) + goto dump_data; + memcpy(&user_id, p, sizeof(user_id)); + printf("%lx\n", user_id); + continue; + + case RXRPC_ABORT: + printf("RXRPC_ABORT: "); + if (n != sizeof(abort_code)) + goto dump_data; + memcpy(&abort_code, p, sizeof(abort_code)); + printf("%d\n", abort_code); + continue; + + case RXRPC_ACK: + printf("RXRPC_ACK"); + if (n != 0) + goto dump_data_colon; + goto print_nl; + + case RXRPC_RESPONSE: + printf("RXRPC_RESPONSE"); + if (n != 0) + goto dump_data_colon; + goto print_nl; + + case RXRPC_NET_ERROR: + printf("RXRPC_NET_ERROR: "); + if (n != sizeof(abort_code)) + goto dump_data; + memcpy(&abort_code, p, sizeof(abort_code)); + printf("%s\n", strerror(abort_code)); + continue; + + case RXRPC_BUSY: + printf("RXRPC_BUSY"); + if (n != 0) + goto dump_data_colon; + goto print_nl; + + case RXRPC_LOCAL_ERROR: + printf("RXRPC_LOCAL_ERROR: "); + if (n != sizeof(abort_code)) + goto dump_data; + memcpy(&abort_code, p, sizeof(abort_code)); + printf("%s\n", strerror(abort_code)); + continue; + + default: + break; + } + } + + printf("l=%d t=%d", cmsg->cmsg_level, cmsg->cmsg_type); + + dump_data_colon: + printf(": "); + dump_data: + printf("{"); + for (; n > 0; n--, p++) + printf("%02x", *p); + + print_nl: + printf("}\n"); + } +} + +/* + * Initialise the transport module + */ +static struct kafs_transport_handle *af_rxrpc_init_transport(const struct sockaddr *sa, + socklen_t salen) +{ + struct kafs_transport_handle *trans; + switch (sa->sa_family) { + case AF_INET: + case AF_INET6: + break; + default: + fprintf(stderr, "Unsupported endpoint type\n"); + errno = EPROTOTYPE; + return NULL; + } + + trans = calloc(1, sizeof(*trans)); + if (!trans) + return NULL; + + usage_set(&trans->usage); + trans->local.srx_family = AF_RXRPC; + trans->local.srx_service = 0; + trans->local.transport_type = SOCK_DGRAM; + trans->local.transport_len = salen; + memcpy(&trans->local.transport, sa, salen); + + return trans; +} + +/* + * Free the transport module + */ +static void af_rxrpc_free_transport(struct kafs_transport_handle *trans) +{ + if (usage_put_and_test(&trans->usage)) + free(trans); +} + +/* + * Set up a security descriptor + */ +static struct kafs_security_handle *af_rxrpc_new_security( + struct kafs_transport_handle *transport, + const char *cell_name, + enum kafs_connection_auth_level level) +{ + struct kafs_security_handle *security; + size_t namelen = strlen(cell_name); + + security = calloc(1, sizeof(*security) + namelen + 1); + if (!security) + return NULL; + + usage_set(&security->usage); + security->transport = transport; + security->cell_name_len = namelen; + security->level = level; + switch (level) { + case kafs_connection_no_auth: + break; + case kafs_connection_local_auth: + fprintf(stderr, "Local auth not supported by AF_RXRPC transport\n"); + errno = -EINVAL; + free(security); + return NULL; + case kafs_connection_clear: + security->min_sec_level = RXRPC_SECURITY_PLAIN; + break; + case kafs_connection_integrity_only: + security->min_sec_level = RXRPC_SECURITY_AUTH; + break; + case kafs_connection_encrypt: + security->min_sec_level = RXRPC_SECURITY_ENCRYPT; + break; + } + memcpy(security->cell_name, cell_name, namelen + 1); + usage_get(&security->transport->usage); + return security; +} + +/* + * Free a security descriptor + */ +static void af_rxrpc_free_security(struct kafs_security_handle *security) +{ + if (usage_put_and_test(&security->usage)) { + af_rxrpc_free_transport(security->transport); + free(security); + } +} + +/* + * Set up a new connection + */ +static struct kafs_connection_handle *af_rxrpc_open_connection( + struct kafs_transport_handle *transport, + struct kafs_security_handle *security, + const struct sockaddr *sa, + socklen_t salen, + uint16_t service, + int exclusive_connection) +{ + struct kafs_connection_handle *conn; + int ret; + + if (salen > sizeof(conn->peer.transport)) { + errno = EINVAL; + return NULL; + } + + conn = calloc(1, sizeof(*conn)); + if (!conn) + return NULL; + usage_set(&conn->usage); + conn->transport = transport; + conn->security = security; + + conn->peer.srx_family = AF_RXRPC; + conn->peer.srx_service = service; + conn->peer.transport_type = SOCK_DGRAM; + conn->peer.transport_len = salen; + memcpy(&conn->peer.transport, sa, salen); + + /* Open up a socket for talking to the AF_RXRPC module */ + conn->fd = socket(AF_RXRPC, SOCK_DGRAM, PF_INET); + if (conn->fd < 0) + goto error_conn; + + if (exclusive_connection) { + ret = setsockopt(conn->fd, + SOL_RXRPC, RXRPC_EXCLUSIVE_CONNECTION, + NULL, 0); + if (ret == -1) + goto error_conn; + } + + if (security) { + ret = setsockopt(conn->fd, SOL_RXRPC, RXRPC_MIN_SECURITY_LEVEL, + &security->min_sec_level, + sizeof(security->min_sec_level)); + if (ret == -1) + goto error_conn; + + ret = setsockopt(conn->fd, SOL_RXRPC, RXRPC_SECURITY_KEY, + security->cell_name, security->cell_name_len); + if (ret == -1) + goto error_conn; + } + + /* Bind an address to the local endpoint */ + ret = bind(conn->fd, (struct sockaddr *)&conn->transport->local, + sizeof(conn->transport->local)); + if (ret < 0) + goto error_fd; + + usage_get(&security->usage); + usage_get(&transport->usage); + return conn; + +error_fd: + close(conn->fd); +error_conn: + free(conn); + return NULL; +} + +/* + * Close an RxRPC client connection. This will cause all outstanding + * operations to be aborted by the kernel.. + */ +static void af_rxrpc_close_connection(struct kafs_connection_handle *conn) +{ + if (usage_put_and_test(&conn->usage)) { + close(conn->fd); + af_rxrpc_free_security(conn->security); + af_rxrpc_free_transport(conn->transport); + free(conn); + } +} + +/* + * Allocate a call handle for a new call. + */ +static struct kafs_call_handle *af_rxrpc_make_call( + struct kafs_connection_handle *conn, + kafs_data_ready_func data_ready_func, + void *caller_data) +{ + struct kafs_call_handle *call; + + call = calloc(1, sizeof(*call)); + if (!call) + return NULL; + + usage_set(&call->usage); + call->conn = conn; + call->data_ready_func = data_ready_func; + call->caller_data = caller_data; + usage_get(&conn->usage); + return call; +} + +/* + * Send buffered data. + */ +static int af_rxrpc_send_data(struct kafs_call_handle *call, + struct iovec *iov, + int ioc, + int more) +{ + struct msghdr msg; + size_t ctrllen; + unsigned char control[128]; + int ret, i; + + /* Request an operation */ + ctrllen = 0; + RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)call); + + msg.msg_name = &call->conn->peer; + msg.msg_namelen = sizeof(call->conn->peer); + msg.msg_iov = iov; + msg.msg_iovlen = ioc; + msg.msg_control = control; + msg.msg_controllen = ctrllen; + msg.msg_flags = 0; + + if (more) + more = MSG_MORE; + + //dump_cmsg(&msg); + for (i = 0; i < msg.msg_iovlen; i++) + debug("IOV[%02u] %04zu %p\n", + i, msg.msg_iov[i].iov_len, msg.msg_iov[i].iov_base); + + /* Send the data */ + ret = sendmsg(call->conn->fd, &msg, more); + debug("SENDMSG: %d%s\n", ret, more ? " [more]" : ""); + if (ret >= 0) + call->known_to_kernel = 1; + return ret; +} + +/* + * Receive data from a connection and pass it to the appropriate call. + */ +static int af_rxrpc_recv_mux(struct kafs_connection_handle *conn) +{ + struct kafs_call_handle *call; + struct sockaddr_rxrpc srx; + struct cmsghdr *cmsg; + struct msghdr msg; + struct iovec iov[1]; + unsigned char control[128]; + uint32_t tmpbuf[1]; + int ret; + + for (;;) { + /* Peek at the next message */ + iov[0].iov_base = &tmpbuf; + iov[0].iov_len = sizeof(tmpbuf); + + memset(&msg, 0, sizeof(msg)); + msg.msg_iov = iov; + msg.msg_iovlen = 1; + msg.msg_name = &srx; + msg.msg_namelen = sizeof(srx); + msg.msg_control = control; + msg.msg_controllen = sizeof(control); + msg.msg_flags = 0; + + ret = recvmsg(conn->fd, &msg, MSG_PEEK | MSG_DONTWAIT); + debug("RECVMSG PEEK: %d\n", ret); + if (ret == -1) { + if (errno == EAGAIN || + errno == ENODATA) + return 0; + return -1; + } + //dump_cmsg(&msg); + + /* Find the call ID. */ + call = NULL; + for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) { + unsigned char *p; + + if (cmsg->cmsg_level != SOL_RXRPC || + cmsg->cmsg_type != RXRPC_USER_CALL_ID) + continue; + + p = CMSG_DATA(cmsg); + call = *(struct kafs_call_handle **)p; + goto found_call; + } + abort(); + + found_call: + if (!call->known_to_kernel) { + fprintf(stderr, "Unexpected packet for dead call\n"); + abort(); + } + call->data_ready_func(call, call->caller_data); + } +} + +/* + * Receive data from a transport. + */ +static int af_rxrpc_recv_data(struct kafs_call_handle *call, + struct iovec *iov, + int ioc, + enum kafs_recv_condition *_cond, + uint32_t *_supplementary_info) +{ + struct sockaddr_rxrpc srx; + struct cmsghdr *cmsg; + struct msghdr msg; + unsigned char control[128]; + enum kafs_recv_condition cond; + uint32_t supplementary_info; + int ret; + + memset(&msg, 0, sizeof(msg)); + msg.msg_iov = iov; + msg.msg_iovlen = ioc; + msg.msg_name = &srx; + msg.msg_namelen = sizeof(srx); + msg.msg_control = control; + msg.msg_controllen = sizeof(control); + msg.msg_flags = 0; + + ret = recvmsg(call->conn->fd, &msg, MSG_DONTWAIT); + debug("RECVMSG: %d\n", ret); + if (ret == -1) + return -1; + + debug("RECV: %d [fl:%x]\n", ret, msg.msg_flags); + debug("CMSG: %zu\n", msg.msg_controllen); + debug("IOV: %zu [0]=%zu\n", msg.msg_iovlen, iov[0].iov_len); + + /* Process the metadata */ + supplementary_info = 0; + if (msg.msg_flags & MSG_MORE) + cond = kafs_recv_more; + else if (msg.msg_flags & MSG_EOR) + cond = kafs_recv_complete; + else + cond = kafs_recv_complete_now_send; + + if (msg.msg_flags & MSG_EOR) { + debug("Detected EOR\n"); + call->known_to_kernel = 0; + } + + for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) { + unsigned char *p; + int n; + + if (cmsg->cmsg_level != SOL_RXRPC) + continue; + + n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg)); + p = CMSG_DATA(cmsg); + + switch (cmsg->cmsg_type) { + case RXRPC_USER_CALL_ID: + if (*(struct kafs_call_handle **)p != call) { + fprintf(stderr, "Message queue changed from MSG_PEEK\n"); + abort(); + } + break; + + case RXRPC_ABORT: + if (n == 4) + memcpy(&supplementary_info, p, 4); + errno = ECONNABORTED; + cond = kafs_recv_abort; + break; + + case RXRPC_NET_ERROR: + if (n != sizeof(ret)) { + errno = EBADMSG; + return -1; + } + memcpy(&supplementary_info, p, 4); + cond = kafs_recv_net_error; + break; + + case RXRPC_LOCAL_ERROR: + if (n != sizeof(ret)) { + errno = EBADMSG; + return -1; + } + memcpy(&supplementary_info, p, 4); + cond = kafs_recv_local_error; + break; + + case RXRPC_BUSY: + cond = kafs_recv_busy; + break; + + case RXRPC_ACK: + cond = kafs_recv_final_ack; + break; + + case RXRPC_RESPONSE: + break; + + default: + fprintf(stderr, "Unexpected CMSG type %x\n", cmsg->cmsg_type); + break; + } + } + + *_cond = cond; + *_supplementary_info = supplementary_info; + debug("<--af_rxrpc_recv_data() = %d [c=%u s=%d]\n", + ret, cond, supplementary_info); + return ret; +} + +/* + * Abort a call. + */ +static void af_rxrpc_abort_call(struct kafs_call_handle *call, uint32_t abort_code) +{ + struct msghdr msg; + size_t ctrllen; + unsigned char control[128]; + + memset(control, 0, sizeof(control)); + ctrllen = 0; + RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)call); + RXRPC_ADD_ABORT(control, ctrllen, abort_code); + + msg.msg_name = &call->conn->peer; + msg.msg_namelen = sizeof(call->conn->peer); + msg.msg_iov = NULL; + msg.msg_iovlen = 0; + msg.msg_control = control; + msg.msg_controllen = ctrllen; + msg.msg_flags = 0; + + sendmsg(call->conn->fd, &msg, 0); +} + +/* + * Terminate a call. + */ +static void af_rxrpc_terminate_call(struct kafs_call_handle *call) +{ + if (usage_put_and_test(&call->usage)) { + af_rxrpc_close_connection(call->conn); + free(call); + } +} + +/* + * Poll a connection to see if it has data available. + */ +static int af_rxrpc_poll_connection(struct kafs_connection_handle *conn, int nowait) +{ + struct pollfd fds[1]; + int ret; + + debug("-->poll()\n"); + + fds[0].fd = conn->fd; + fds[0].events = POLLIN; + fds[0].revents = 0; + + ret = poll(fds, 1, nowait ? 0 : -1); + + if (ret == -1) { + fprintf(stderr, "Poll failed: %m\n"); + return -1; + } + if (ret > 0 && af_rxrpc_recv_mux(conn)) { + fprintf(stderr, "rxrpc_recv_mux failed: %m\n"); + return -1; + } + + return 0; +} + +struct kafs_transport kafs_transport = { + .id = kafs_transport_id, + .name = "AF_RXRPC", + .init_transport = af_rxrpc_init_transport, + .free_transport = af_rxrpc_free_transport, + .new_security = af_rxrpc_new_security, + .free_security = af_rxrpc_free_security, + .open_connection = af_rxrpc_open_connection, + .close_connection = af_rxrpc_close_connection, + .make_call = af_rxrpc_make_call, + .new_service = NULL, + .shutdown_service = NULL, + .accept_call = NULL, + .abort_call = af_rxrpc_abort_call, + .terminate_call = af_rxrpc_terminate_call, + .send_data = af_rxrpc_send_data, + .recv_data = af_rxrpc_recv_data, + .poll_connection = af_rxrpc_poll_connection, +}; diff --git a/kafs/af_rxrpc.h b/af_rxrpc/af_rxrpc.h similarity index 100% rename from kafs/af_rxrpc.h rename to af_rxrpc/af_rxrpc.h diff --git a/kafs/argparse.py b/kafs/argparse.py index be6f350..66f44d1 100644 --- a/kafs/argparse.py +++ b/kafs/argparse.py @@ -16,74 +16,74 @@ as published by the Free Software Foundation; either version from kafs.exception import AFSArgumentError, AFSHelpFlag from kafs.lib.output import set_verbosity -def get_cell(switch, params): +def get_cell(switch, params, transport): from kafs.lib.cell import cell - return cell(params[0]) + return cell(transport, params[0]) -def get_bosserver(switch, params): +def get_bosserver(switch, params, transport): from kafs.lib.bosserver import bosserver return bosserver(params[0]) -def get_fileserver(switch, params): +def get_fileserver(switch, params, transport): from kafs.lib.fileserver import fileserver return fileserver(params[0]) -def get_volserver(switch, params): +def get_volserver(switch, params, transport): from kafs.lib.volserver import volserver return volserver(params[0]) -def get_vlservers(switch, params): +def get_vlservers(switch, params, transport): from kafs.lib.vlserver import vlserver servers = [] for i in params: servers.append(vlserver(params[0])) return servers -def get_volume_name(switch, params): +def get_volume_name(switch, params, transport): return params[0] -def get_volume_names(switch, params): +def get_volume_names(switch, params, transport): return params -def get_machine_name(switch, params): +def get_machine_name(switch, params, transport): return params[0] -def get_machine_names(switch, params): +def get_machine_names(switch, params, transport): return params -def get_path_name(switch, params): +def get_path_name(switch, params, transport): return params[0] -def get_path_names(switch, params): +def get_path_names(switch, params, transport): return params -def get_file_name(switch, params): +def get_file_name(switch, params, transport): return params[0] -def get_file_names(switch, params): +def get_file_names(switch, params, transport): return params -def get_partition_id(switch, params): +def get_partition_id(switch, params, transport): from kafs.lib.partition import part2id return part2id(params[0]) -def get_auth(switch, params): +def get_auth(switch, params, transport): return params -def get_uuid(switch, params): +def get_uuid(switch, params, transport): from kafs.lib.uuid import str2uuid return str2uuid(params[0]) -def get_string(switch, params): +def get_string(switch, params, transport): return params[0] -def get_strings(switch, params): +def get_strings(switch, params, transport): return params -def get_dummy(switch, params): +def get_dummy(switch, params, transport): return params -def get_verbose(switch, params): +def get_verbose(switch, params, transport): set_verbosity() return params @@ -94,10 +94,10 @@ def do_get_id(i): raise AFSArgumentError("UID/GID identifier is not numeric") return int(i) -def get_id(switch, params): +def get_id(switch, params, transport): return do_get_id(params[0]) -def get_ids(switch, params): +def get_ids(switch, params, transport): ids = [] for i in params: ids.append(do_get_id(i)) @@ -110,10 +110,10 @@ def do_get_uid(i): raise AFSArgumentError("UID identifier is a positive integer") return int(i) -def get_uid(switch, params): +def get_uid(switch, params, transport): return do_get_uid(params[0]) -def get_uids(switch, params): +def get_uids(switch, params, transport): ids = [] for i in params: ids.append(do_get_uid(i)) @@ -126,10 +126,10 @@ def do_get_gid(i): raise AFSArgumentError("GID identifier is a positive integer") return int(i) -def get_gid(switch, params): +def get_gid(switch, params, transport): return do_get_gid(params[0]) -def get_gids(switch, params): +def get_gids(switch, params, transport): ids = [] for i in params: ids.append(do_get_gid(i)) @@ -185,7 +185,7 @@ def get_gids(switch, params): # ############################################################################### def parse_arguments(args, available_arguments, argument_size_limits, - cant_combine_arguments): + cant_combine_arguments, transport): result = {} need_switch = False i = 0 # Given arguments index @@ -285,7 +285,7 @@ def parse_arguments(args, available_arguments, argument_size_limits, # Call the syntax checker syntax = match[1] result["raw." + match[0]] = params - result[match[0]] = syntax(match[0], params) + result[match[0]] = syntax(match[0], params, transport) # Check for missing required arguments for j in available_arguments: diff --git a/kafs/commands/pts/setfields.py b/kafs/commands/pts/setfields.py index b1757ad..3215c59 100644 --- a/kafs/commands/pts/setfields.py +++ b/kafs/commands/pts/setfields.py @@ -29,7 +29,7 @@ import sys help = "Set privacy flags or quota for a Protection Database entry" -def get_privacy_flags(switch, params): +def get_privacy_flags(switch, params, transport): s = params[0] if len(s) != 5: raise AFSArgumentError("Privacy flag string must be exactly 5 characters") diff --git a/kafs/lib/cell.py b/kafs/lib/cell.py index add842a..b3fa971 100644 --- a/kafs/lib/cell.py +++ b/kafs/lib/cell.py @@ -36,7 +36,7 @@ class CellError(AFSException): class cell: """Represents an AFS cell. We hold the cell name here and the list of VL servers once we've looked it up""" - def __init__(self, name = None): + def __init__(self, transport, name = None): if name == None: name = linecache.getline("/etc/afs/ThisCell", 1) name = name.strip() @@ -48,6 +48,7 @@ class cell: verbose("New Cell ", name, "\n") self.__name = name + self.__transport = transport self.__looked_up = False self.__vlserver_names = dict() self.__vlservers = [] @@ -178,7 +179,8 @@ class cell: for vladdr in self.query_vl_addrs(): verbose("Trying vlserver ", vladdr, "\n") - z_conn = rpc.rx_new_connection(str(vladdr), rpc.VL_PORT, rpc.VL_SERVICE, + z_conn = rpc.rx_new_connection(self.__transport, + str(vladdr), rpc.VL_PORT, rpc.VL_SERVICE, key, security) try: ret = rpc.VL_Probe(z_conn) @@ -197,7 +199,8 @@ class cell: key, security = self.determine_security(params) verbose("Trying volserver ", server.addr(), "\n") - vol_conn = rpc.rx_new_connection(str(server.addr()), + vol_conn = rpc.rx_new_connection(self.__transport, + str(server.addr()), rpc.VOLSERVICE_PORT, rpc.VOLSERVICE_ID, key, security) @@ -208,7 +211,8 @@ class cell: key, security = self.determine_security(params) verbose("Trying bosserver ", server.addr(), "\n") - bos_conn = rpc.rx_new_connection(str(server.addr()), + bos_conn = rpc.rx_new_connection(self.__transport, + str(server.addr()), rpc.BOSSERVICE_PORT, rpc.BOSSERVICE_ID, key, security) @@ -228,7 +232,8 @@ class cell: verbose("Trying ptserver ", server, "\n") - pt_conn = rpc.rx_new_connection(str(server), + pt_conn = rpc.rx_new_connection(self.__transport, + str(server), rpc.PR_PORT, rpc.PR_SERVICE, key, security) diff --git a/kafs/lib/parse_setrestart_time.py b/kafs/lib/parse_setrestart_time.py index b3282cd..8db1131 100644 --- a/kafs/lib/parse_setrestart_time.py +++ b/kafs/lib/parse_setrestart_time.py @@ -31,7 +31,7 @@ days = [ "sunday", "monday", "tuesday", "wednesday", "thursday", "friday", "satu # Parse the bos setrestart -time argument # ############################################################################### -def parse_restart_time(switch, params): +def parse_restart_time(switch, params, transport): at = rpc.bozo_netKTime() time = params[0].strip().lower() diff --git a/kafs/main.py b/kafs/main.py index a490326..8e2b641 100644 --- a/kafs/main.py +++ b/kafs/main.py @@ -1,5 +1,5 @@ # -# AFS Toolset command switcher +# AFS Toolset command switcher # -*- coding: utf-8 -*- # @@ -30,6 +30,7 @@ import sys, os, traceback import kafs.commands from kafs.lib.output import * from kafs.exception import AFSArgumentError, AFSHelpFlag +import kafs.rpc as rpc ############################################################################### # @@ -124,12 +125,19 @@ def _main(): else: argument_size_limits = {} + # Load the transport module + rpc.load_kafs_transport("/usr/local/lib64/kafs_af_rxrpc.so") + + # Set up a transport + transport = rpc.rx_new_transport() + # Parse the parameters try: params = kafs.argparse.parse_arguments(sys.argv[1:], - command.command_arguments, - argument_size_limits, - cant_combine_arguments) + command.command_arguments, + argument_size_limits, + cant_combine_arguments, + transport) except AFSArgumentError as e: print(prog + ":", e, file=sys.stderr) sys.exit(2) @@ -141,7 +149,7 @@ def _main(): # Stick in the default cell if there isn't one if "cell" not in params: from kafs.lib.cell import cell - params["cell"] = cell() + params["cell"] = cell(transport) params["_prog"] = prog params["_cmdsetmod"] = cmdsetmod diff --git a/kafs/py_rxconn.c b/kafs/py_rxconn.c index 604eb18..0a42311 100644 --- a/kafs/py_rxconn.c +++ b/kafs/py_rxconn.c @@ -105,7 +105,8 @@ PyObject * kafs_py_rx_new_connection(PyObject *_self, PyObject *args) { struct py_rx_connection *obj; - struct rx_connection *z_conn; + struct py_rx_transport *trans; + struct rx_connection *conn; union { struct sockaddr sa; struct sockaddr_in sin; @@ -116,7 +117,8 @@ kafs_py_rx_new_connection(PyObject *_self, PyObject *args) uint16_t port, service, local_port = 0, local_service = 0; int exclusive = 0, security = 0; - if (!PyArg_ParseTuple(args, "sHHzi|HHp", + if (!PyArg_ParseTuple(args, "O!sHHzi|HHp", + &py_rx_transportType, &trans, &address, &port, &service, &key, &security, &local_port, &local_service, &exclusive)) return NULL; @@ -141,14 +143,13 @@ kafs_py_rx_new_connection(PyObject *_self, PyObject *args) py_rx_connection_init((PyObject *)obj, NULL, NULL); assert(obj->x == NULL); - z_conn = rx_new_connection(&sa.sa, salen, service, - local_port, local_service, exclusive, - key, security); - if (!z_conn) { + conn = rx_new_connection(trans->x, &sa.sa, salen, service, + exclusive, key, security); + if (!conn) { Py_DECREF(obj); return errno == ENOMEM ? PyExc_MemoryError : PyErr_SetFromErrno(PyExc_IOError); } - obj->x = z_conn; + obj->x = conn; return (PyObject *)obj; } diff --git a/kafs/py_rxgen.c b/kafs/py_rxgen.c index 27ed553..820f6cd 100644 --- a/kafs/py_rxgen.c +++ b/kafs/py_rxgen.c @@ -1152,11 +1152,14 @@ int py_dec_into_string(struct rx_call *call) needed = call->blob_size - call->blob_offset; segment = call->data_stop - call->data_cursor; - debug("DEC STR dc=%u bsize=%u seg=%u\n", call->data_count, needed, segment); + debug("DEC STR dc=%u bsize=%u-%u seg=%lu-%lu\n", + call->data_count, call->blob_offset, call->blob_size, + call->data_cursor - call->data_start, call->data_stop - call->data_start); if (segment > 0) { if (segment > needed) segment = needed; + debug("DEC STR - seg %u\n", segment); if (call->blob != &rxgen_dec_padding_sink) { for (i = 0; i < segment; i++) PyUnicode_WRITE(PyUnicode_KIND(str), PyUnicode_DATA(str), @@ -1165,9 +1168,14 @@ int py_dec_into_string(struct rx_call *call) call->blob_decoded += segment; call->blob_offset += segment; call->data_cursor += segment; - if (call->blob_size <= call->blob_offset) { + if (call->blob_offset >= call->blob_size) { + if (call->blob_offset > call->blob_size) { + debug("DEC STR buffer overrun\n"); + abort(); + } if (call->blob != &rxgen_dec_padding_sink && call->padding_size > 0) { /* Soak up the padding to a 32-bit boundary */ + debug("DEC STR padding\n"); call->blob = &rxgen_dec_padding_sink; call->blob_size = call->padding_size; call->blob_offset = 0; @@ -1188,8 +1196,6 @@ int py_dec_init_string(struct rx_call *call, PyObject **_str) { PyObject *str; - debug("INIT STRING %u\n", call->blob_size); - str = PyUnicode_New(call->blob_size, 255); if (!str) return -1; @@ -1205,6 +1211,7 @@ int py_dec_init_string(struct rx_call *call, PyObject **_str) call->blob = str; call->blob_offset = 0; call->padding_size = calc_pad_to_4(call->blob_size); + debug("INIT STRING %u,%u\n", call->blob_size, call->padding_size); return 1; } diff --git a/kafs/py_rxgen.h b/kafs/py_rxgen.h index 25ccb61..700568d 100644 --- a/kafs/py_rxgen.h +++ b/kafs/py_rxgen.h @@ -14,6 +14,11 @@ #include "rxgen.h" +struct py_rx_transport { + PyObject_HEAD + struct kafs_transport_handle *x; +}; + struct py_rx_connection { PyObject_HEAD struct rx_connection *x; @@ -50,8 +55,11 @@ struct py_rx_split_info { bool receiving_data; }; +extern PyTypeObject py_rx_transportType; extern PyTypeObject py_rx_connectionType; +extern PyObject *kafs_py_load_kafs_transport(PyObject *, PyObject *); +extern PyObject *kafs_py_rx_new_transport(PyObject *, PyObject *); extern PyObject *kafs_py_rx_new_connection(PyObject *, PyObject *); extern PyObject *kafs_py_string_to_key(PyObject *, PyObject *); diff --git a/kafs/py_rxsplit.c b/kafs/py_rxsplit.c index ef225e1..a659e79 100644 --- a/kafs/py_rxsplit.c +++ b/kafs/py_rxsplit.c @@ -11,7 +11,6 @@ #include #include "structmember.h" -#include #include #include #include "py_rxgen.h" @@ -23,14 +22,7 @@ static int py_rx_split_do_recv(struct rx_call *call, struct py_rx_split_info *split_info) { - struct pollfd fds[1]; - - fds[0].fd = call->conn->fd; - fds[0].events = POLLIN; - fds[0].revents = 0; - poll(fds, 1, 0); - if (fds[0].revents & POLLIN) - rxrpc_recv_data(call->conn, true); + rxrpc_maybe_recv_data(call->conn); return 0; } diff --git a/kafs/py_rxtrans.c b/kafs/py_rxtrans.c new file mode 100644 index 0000000..69b2807 --- /dev/null +++ b/kafs/py_rxtrans.c @@ -0,0 +1,154 @@ +/* Python RxRPC transport container object + * + * Copyright (C) 2015 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public Licence + * as published by the Free Software Foundation; either version + * 2 of the Licence, or (at your option) any later version. + */ + +#include +#include "structmember.h" +#include +#include "py_rxgen.h" +#include "rxgen.h" + +/* + * RxRPC transport container. + */ +static int +py_rx_transport_init(PyObject *_self, PyObject *args, PyObject *kwds) +{ + struct py_rx_transport *self = (struct py_rx_transport *)_self; + self->x = NULL; + return 0; +} + +static void +py_rx_transport_dealloc(struct py_rx_transport *self) +{ + if (self->x) { + rx_free_transport(self->x); + self->x = NULL; + } + Py_TYPE(self)->tp_free((PyObject *)self); +} + +PyTypeObject py_rx_transportType = { + PyVarObject_HEAD_INIT(NULL, 0) + "kafs.rx_transport", /*tp_name*/ + sizeof(struct py_rx_transport), /*tp_basicsize*/ + 0, /*tp_itemsize*/ + (destructor)py_rx_transport_dealloc, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + 0, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash */ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /*tp_flags*/ + "RxRPC transport container", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + 0, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + py_rx_transport_init, /* tp_init */ + 0, /* tp_alloc */ + 0, /* tp_new */ +}; + +/* + * Set up an RxRPC transport. + */ +PyObject * +kafs_py_rx_new_transport(PyObject *_self, PyObject *args) +{ + struct py_rx_transport *obj; + struct kafs_transport_handle *trans; + union { + struct sockaddr sa; + struct sockaddr_in sin; + struct sockaddr_in6 sin6; + } sa; + const char *local_address = NULL; + socklen_t salen; + uint16_t local_port = 0; + + if (!PyArg_ParseTuple(args, "|Hs", &local_port, &local_address)) + return NULL; + + memset(&sa, 0, sizeof(sa)); + if (!local_address && !local_port) { + sa.sin.sin_family = AF_INET; + salen = sizeof(sa.sin); + } else if (!local_address) { + sa.sin.sin_family = AF_INET; + sa.sin.sin_port = htons(local_port); + salen = sizeof(sa.sin); + } else if (inet_pton(AF_INET, local_address, &sa.sin.sin_addr)) { + sa.sin.sin_family = AF_INET; + sa.sin.sin_port = htons(local_port); + salen = sizeof(sa.sin); + } else if (inet_pton(AF_INET6, local_address, &sa.sin.sin_addr)) { + sa.sin6.sin6_family = AF_INET6; + sa.sin6.sin6_port = htons(local_port); + salen = sizeof(sa.sin6); + } else { + return PyErr_Format(PyExc_RuntimeError, + "Unsupported network address '%s'", local_address); + } + + obj = (struct py_rx_transport *)_PyObject_New(&py_rx_transportType); + if (!obj) + return PyExc_MemoryError; + py_rx_transport_init((PyObject *)obj, NULL, NULL); + assert(obj->x == NULL); + + trans = rx_new_transport(&sa.sa, salen); + if (!trans) { + Py_DECREF(obj); + return errno == ENOMEM ? PyExc_MemoryError : + PyErr_SetFromErrno(PyExc_IOError); + } + obj->x = trans; + return (PyObject *)obj; +} + +/* + * Load a kAFS transport module. + */ +PyObject * +kafs_py_load_kafs_transport(PyObject *_self, PyObject *args) +{ + const char *name = NULL; + int ret; + + if (!PyArg_ParseTuple(args, "s", &name)) + return NULL; + + ret = load_kafs_transport_module(name); + if (ret < 0) + return errno == ENOMEM ? PyExc_MemoryError : + PyErr_SetFromErrno(PyExc_IOError); + Py_RETURN_TRUE; +} diff --git a/kafs/rxgen.h b/kafs/rxgen.h index eeb123d..3096143 100644 --- a/kafs/rxgen.h +++ b/kafs/rxgen.h @@ -12,17 +12,17 @@ #ifndef _RXGEN_H #define _RXGEN_H -#include "af_rxrpc.h" +#include "transport.h" #include #include #include +#include typedef uint32_t net_xdr_t; struct rx_connection { - struct sockaddr_rxrpc peer; + struct kafs_connection_handle *handle; uint32_t last_abort_code; - int fd; }; #define RXGEN_BUFFER_SIZE 1024 @@ -61,7 +61,10 @@ enum rx_call_state { struct rx_call { uint32_t magic; + struct kafs_call_handle *handle; struct rx_connection *conn; + struct sockaddr sa; + socklen_t sa_len; enum rx_call_state state; unsigned known_to_kernel : 1; unsigned secured : 1; @@ -184,23 +187,29 @@ static inline int rxrpc_post_dec(struct rx_call *call) extern void rxrpc_dec_advance_buffer(struct rx_call *call); extern int rxgen_dec_discard_excess(struct rx_call *call); -extern struct rx_connection *rx_new_connection(const struct sockaddr *sa, +extern int load_kafs_transport_module(const char *name); + +extern struct kafs_transport_handle *rx_new_transport(const struct sockaddr *local_sa, + socklen_t local_salen); +extern void rx_free_transport(struct kafs_transport_handle *trans); + +extern struct rx_connection *rx_new_connection(struct kafs_transport_handle *trans, + const struct sockaddr *sa, socklen_t salen, uint16_t service, - uint16_t local_port, - uint16_t local_service, int exclusive, - const char *key, - int security); + const char *cellname, + enum kafs_connection_auth_level level); -extern void rx_close_connection(struct rx_connection *z_conn); +extern void rx_close_connection(struct rx_connection *conn); -extern struct rx_call *rxrpc_alloc_call(struct rx_connection *z_conn, int incoming_call); +extern struct rx_call *rxrpc_make_call(struct rx_connection *conn); extern void rxrpc_abort_call(struct rx_call *call, uint32_t abort_code); extern void rxrpc_terminate_call(struct rx_call *call, uint32_t abort_code); extern int rxrpc_send_data(struct rx_call *call); -extern int rxrpc_recv_data(struct rx_connection *z_conn, bool nowait); +extern int rxrpc_recv_data(struct rx_connection *conn, bool nowait); +extern int rxrpc_maybe_recv_data(struct rx_connection *conn); extern int rxrpc_run_sync_call(struct rx_call *call); #endif /* _RXGEN_H */ diff --git a/kafs/af_rxrpc.c b/kafs/transport.c similarity index 57% rename from kafs/af_rxrpc.c rename to kafs/transport.c index 9ec8a4d..e6da4c3 100644 --- a/kafs/af_rxrpc.c +++ b/kafs/transport.c @@ -1,6 +1,6 @@ -/* AF_RXRPC driver +/* Dynamically load the Rx transport module * - * Copyright (C) 2014 Red Hat, Inc. All Rights Reserved. + * Copyright (C) 2015 Red Hat, Inc. All Rights Reserved. * Written by David Howells (dhowells@redhat.com) * * This program is free software; you can redistribute it and/or @@ -9,231 +9,167 @@ * 2 of the Licence, or (at your option) any later version. */ -#define _XOPEN_SOURCE +#include #include #include #include -#include -#include #include #include -#include -#include "af_rxrpc.h" +#include +#include "transport.h" #include "rxgen.h" #define RXGEN_CALL_MAGIC 0x52584745U #define RXGEN_BUF_MAGIC (0x52420000U | __LINE__) #define RXGEN_BUF_DEAD (0x6b6bU | __LINE__) -#define debug(fmt, ...) do { if (0) printf(fmt, ## __VA_ARGS__); } while (0) +#define debug(fmt, ...) do { if (0) printf("TRAN: "fmt, ## __VA_ARGS__); } while (0) uint32_t rxgen_dec_padding_sink; +static struct kafs_transport *transport; +static char transport_id[] = kafs_transport_id; + +static void rxrpc_data_ready(struct kafs_call_handle *call_handle, void *caller_data); + /* - * dump the control messages + * Load a transport module and extract the entry points */ -static __attribute__((unused)) -void dump_cmsg(struct msghdr *msg) +int load_kafs_transport_module(const char *name) { - struct cmsghdr *cmsg; - unsigned long user_id; - unsigned char *p; - int abort_code; - int n; - - for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) { - n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg)); - p = CMSG_DATA(cmsg); - - printf("CMSG: %zu: ", cmsg->cmsg_len); - - if (cmsg->cmsg_level == SOL_RXRPC) { - switch (cmsg->cmsg_type) { - case RXRPC_USER_CALL_ID: - printf("RXRPC_USER_CALL_ID: "); - if (n != sizeof(user_id)) - goto dump_data; - memcpy(&user_id, p, sizeof(user_id)); - printf("%lx\n", user_id); - continue; + void *trans; + char *err; - case RXRPC_ABORT: - printf("RXRPC_ABORT: "); - if (n != sizeof(abort_code)) - goto dump_data; - memcpy(&abort_code, p, sizeof(abort_code)); - printf("%d\n", abort_code); - continue; + trans = dlopen(name, RTLD_LAZY); + if (!trans) { + err = dlerror(); + goto error; + } - case RXRPC_ACK: - printf("RXRPC_ACK"); - if (n != 0) - goto dump_data_colon; - goto print_nl; - - case RXRPC_RESPONSE: - printf("RXRPC_RESPONSE"); - if (n != 0) - goto dump_data_colon; - goto print_nl; - - case RXRPC_NET_ERROR: - printf("RXRPC_NET_ERROR: "); - if (n != sizeof(abort_code)) - goto dump_data; - memcpy(&abort_code, p, sizeof(abort_code)); - printf("%s\n", strerror(abort_code)); - continue; + dlerror(); /* clear error */ + transport = dlsym(trans, "kafs_transport"); + err = dlerror(); + if (err) + goto error_2; + + if (memcmp(transport->id, transport_id, sizeof(transport_id)) != 0) { + fprintf(stderr, "Transport '%s' '%s' %zu\n", + transport->id, transport_id, sizeof(transport_id)); + err = "Transport has incorrect ID string\n"; + goto error_2; + } + return 0; - case RXRPC_BUSY: - printf("RXRPC_BUSY"); - if (n != 0) - goto dump_data_colon; - goto print_nl; - - case RXRPC_LOCAL_ERROR: - printf("RXRPC_LOCAL_ERROR: "); - if (n != sizeof(abort_code)) - goto dump_data; - memcpy(&abort_code, p, sizeof(abort_code)); - printf("%s\n", strerror(abort_code)); - continue; +error_2: + dlclose(trans); +error: + fprintf(stderr, "%s: %s\n", name, err); + errno = -ELIBBAD; + return -1; +} - default: - break; - } - } +/* + * Set up a new transport + */ +struct kafs_transport_handle *rx_new_transport(const struct sockaddr *local_sa, + socklen_t local_salen) +{ - printf("l=%d t=%d", cmsg->cmsg_level, cmsg->cmsg_type); + if (!transport) { + fprintf(stderr, "No Rx transport loaded\n"); + errno = EINVAL; + return NULL; + } - dump_data_colon: - printf(": "); - dump_data: - printf("{"); - for (; n > 0; n--, p++) - printf("%02x", *p); + return transport->init_transport(local_sa, local_salen); +} - print_nl: - printf("}\n"); - } +/* + * Destroy a transport + */ +void rx_free_transport(struct kafs_transport_handle *trans) +{ + transport->free_transport(trans); } /* * Set up a new connection */ -struct rx_connection *rx_new_connection(const struct sockaddr *sa, +struct rx_connection *rx_new_connection(struct kafs_transport_handle *trans_handle, + const struct sockaddr *sa, socklen_t salen, uint16_t service, - uint16_t local_port, - uint16_t local_service, int exclusive, - const char *key, - int security) + const char *cellname, + enum kafs_connection_auth_level level) { - struct sockaddr_rxrpc srx; - struct rx_connection *z_conn; - int ret; - - z_conn = calloc(1, sizeof(*z_conn)); - if (!z_conn) - return NULL; - - z_conn->peer.srx_family = AF_RXRPC; - z_conn->peer.srx_service = service; - z_conn->peer.transport_type = SOCK_DGRAM; - z_conn->peer.transport_len = sizeof(srx.transport.sin); - + struct rx_connection *conn; + const struct sockaddr_in *sin; + const struct sockaddr_in6 *sin6; + struct kafs_security_handle *security; + + /* Do some basic parameter checking */ switch (sa->sa_family) { case 0: errno = EDESTADDRREQ; - goto error_conn; - case AF_INET: - if (salen != sizeof(struct sockaddr_in)) - goto inval; - break; - case AF_INET6: - if (salen != sizeof(struct sockaddr_in6)) - goto inval; - break; - default: - errno = EPROTOTYPE; - goto error_conn; - } - - if (security < RXRPC_SECURITY_PLAIN || - security > RXRPC_SECURITY_ENCRYPT) - goto inval; - - memcpy(&z_conn->peer.transport, sa, salen); - switch (sa->sa_family) { + return NULL; case AF_INET: - if (!z_conn->peer.transport.sin.sin_port) { + if (salen != sizeof(struct sockaddr_in)) { + errno = EINVAL; + return NULL; + } + sin = (const struct sockaddr_in *)sa; + if (!sin->sin_port) { errno = EDESTADDRREQ; - goto error_conn; + return NULL; } break; case AF_INET6: - if (!z_conn->peer.transport.sin6.sin6_port) { + if (salen != sizeof(struct sockaddr_in6)) { + errno = EINVAL; + return NULL; + } + sin6 = (const struct sockaddr_in6 *)sa; + if (!sin6->sin6_port) { errno = EDESTADDRREQ; - goto error_conn; + return NULL; } break; + default: + errno = EPROTOTYPE; + return NULL; } - /* Open up a socket for talking to the AF_RXRPC module */ - z_conn->fd = socket(AF_RXRPC, SOCK_DGRAM, PF_INET); - if (z_conn->fd < 0) - goto error_conn; - - if (exclusive) { - ret = setsockopt(z_conn->fd, - SOL_RXRPC, RXRPC_EXCLUSIVE_CONNECTION, - NULL, 0); - if (ret == -1) - goto error_conn; + if (level < kafs_connection_no_auth || + level > kafs_connection_encrypt) { + errno = EINVAL; + return NULL; } - if (key) { - ret = setsockopt(z_conn->fd, SOL_RXRPC, RXRPC_MIN_SECURITY_LEVEL, - &security, sizeof(security)); - if (ret == -1) - goto error_conn; - - ret = setsockopt(z_conn->fd, SOL_RXRPC, RXRPC_SECURITY_KEY, - key, strlen(key)); - if (ret == -1) - goto error_conn; - } + /* Set up our connection tracking */ + conn = calloc(1, sizeof(*conn)); + if (!conn) + return NULL; - /* Bind an address to the local endpoint */ - memset(&srx, 0, sizeof(srx)); - srx.srx_family = AF_RXRPC; - srx.srx_service = local_service; - srx.transport_type = SOCK_DGRAM; - srx.transport_len = salen; - srx.transport.sin.sin_family = sa->sa_family; - switch (sa->sa_family) { - case AF_INET: - srx.transport.sin.sin_port = htons(local_port); - break; - case AF_INET6: - srx.transport.sin6.sin6_port = htons(local_port); - break; - } + /* Create a security handle */ + security = transport->new_security(trans_handle, cellname, level); + if (!security) + goto error_conn; - ret = bind(z_conn->fd, (struct sockaddr *)&srx, sizeof(srx)); - if (ret < 0) - goto error_fd; + /* Open the transport connection */ + conn->handle = transport->open_connection(trans_handle, + security, + sa, salen, + service, + exclusive); + transport->free_security(security); + if (!conn->handle) + goto error_conn; - return z_conn; + return conn; -inval: - errno = EINVAL; - goto error_conn; -error_fd: - close(z_conn->fd); error_conn: - free(z_conn); + free(conn); return NULL; } @@ -241,17 +177,17 @@ error_conn: * Close an RxRPC client connection. This will cause all outstanding * operations to be aborted by the kernel.. */ -void rx_close_connection(struct rx_connection *z_conn) +void rx_close_connection(struct rx_connection *conn) { - close(z_conn->fd); - free(z_conn); + transport->close_connection(conn->handle); + free(conn); } /* - * Allocate a call structure. It is given one buffer ready to go. + * Create a call structure with which to make a call. It is given one buffer + * ready to go. */ -struct rx_call *rxrpc_alloc_call(struct rx_connection *z_conn, - int incoming_call) +struct rx_call *rxrpc_make_call(struct rx_connection *conn) { struct rx_call *call; struct rx_buf *buf; @@ -259,43 +195,107 @@ struct rx_call *rxrpc_alloc_call(struct rx_connection *z_conn, call = calloc(1, sizeof(struct rx_call)); if (!call) - return NULL; + goto error_0; buf = calloc(1, sizeof(struct rx_buf)); - if (!buf) { - free(call); - return NULL; - } + if (!buf) + goto error_1; data = malloc(RXGEN_BUFFER_SIZE); - if (!data) { - free(buf); - free(call); - return NULL; - } + if (!data) + goto error_2; + + buf->magic = RXGEN_BUF_MAGIC; + buf->buf = data; + + call->magic = RXGEN_CALL_MAGIC; + call->conn = conn; + call->state = rx_call_cl_not_started; + call->more_send = true; + call->more_recv = true; + call->buffer_head = buf; + call->buffer_tail = buf; + call->data_start = data; + call->data_cursor = data; + call->data_stop = data + RXGEN_BUFFER_SIZE; + call->buffer_space = RXGEN_BUFFER_SIZE; + + call->handle = transport->make_call(conn->handle, rxrpc_data_ready, call); + if (!call->handle) + goto error_3; debug("Alloc: buf=%p data=%p\n", buf, data); + return call; + +error_3: + free(data); +error_2: + free(buf); +error_1: + free(call); +error_0: + return NULL; +} + +/* + * Create a call structure with which to accept a call. It is given one buffer + * ready to go. + */ +#if 0 +struct rx_call *rxrpc_accept_call(struct kafs_service_handle *service_handle, + void *caller_data, + const struct sockaddr *peer, + socklen_t peerlen) +{ + struct rx_service *service = caller_data; + struct rx_call *call; + struct rx_buf *buf; + uint8_t *data; + + call = calloc(1, sizeof(struct rx_call)); + if (!call) + goto error_0; + + buf = calloc(1, sizeof(struct rx_buf)); + if (!buf) + goto error_1; + + data = malloc(RXGEN_BUFFER_SIZE); + if (!data) + goto error_2; buf->magic = RXGEN_BUF_MAGIC; buf->buf = data; - if (incoming_call) - call->state = rx_call_sv_not_started; - else - call->state = rx_call_cl_not_started; - call->magic = RXGEN_CALL_MAGIC; - call->conn = z_conn; + call->conn = conn; + call->state = rx_call_sv_not_started; call->more_send = true; call->more_recv = true; call->buffer_head = buf; call->buffer_tail = buf; call->data_start = data; call->data_cursor = data; - call->data_stop = incoming_call ? data : data + RXGEN_BUFFER_SIZE; + call->data_stop = data; call->buffer_space = RXGEN_BUFFER_SIZE; + + call->handle = transport->accept_call(service_handle, peer, peerlen); + if (!call->handle) + goto error_3; + + debug("Alloc: buf=%p data=%p\n", buf, data); return call; + +error_3: + free(data); +error_2: + free(buf); +error_1: + free(call); +error_0: + return NULL; } +#endif static void rxrpc_check_buf(const struct rx_buf *buf) { @@ -329,9 +329,6 @@ static void rxrpc_check_call(const struct rx_call *call) int rxrpc_send_data(struct rx_call *call) { struct rx_buf *cursor; - struct msghdr msg; - size_t ctrllen, i; - unsigned char control[128]; struct iovec iov[16]; unsigned more; int ioc, ret; @@ -354,17 +351,6 @@ int rxrpc_send_data(struct rx_call *call) } more_to_send: - /* Request an operation */ - ctrllen = 0; - RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)call); - - msg.msg_name = &call->conn->peer; - msg.msg_namelen = sizeof(call->conn->peer); - msg.msg_iov = iov; - msg.msg_control = control; - msg.msg_controllen = ctrllen; - msg.msg_flags = 0; - /* We may have a completely sent buffer on the front of the queue if * this was the last buffer on the last send. The buffer queue isn't * allowed to be empty at any point. @@ -380,7 +366,7 @@ more_to_send: free(sent); } - more = MSG_MORE; + more = 1; for (ioc = 0; ioc < 16; ioc++) { rxrpc_check_buf(cursor); @@ -404,17 +390,8 @@ more_to_send: } cursor = cursor->next; } - msg.msg_iovlen = ioc; - - /* Send the data */ - //dump_cmsg(&msg); - - for (i = 0; i < msg.msg_iovlen; i++) - debug("IOV[%02zu] %04zu %p\n", - i, msg.msg_iov[i].iov_len, msg.msg_iov[i].iov_base); - ret = sendmsg(call->conn->fd, &msg, more); - debug("SENDMSG: %d%s\n", ret, more ? " [more]" : ""); + ret = transport->send_data(call->handle, iov, ioc, more); if (ret == -1) return -1; @@ -470,63 +447,16 @@ more_to_send: } /* - * Receive data from a socket. + * Call back from the transport to indicate that data is ready. */ -int rxrpc_recv_data(struct rx_connection *z_conn, bool nowait) +static void rxrpc_data_ready(struct kafs_call_handle *call_handle, void *caller_data) { - struct rx_call *call; + struct rx_call *call = caller_data; struct rx_buf *bufs[4] = { NULL }, *cursor; - struct sockaddr_rxrpc srx; - struct cmsghdr *cmsg; - struct msghdr msg; struct iovec iov[4]; - unsigned char control[128]; - uint32_t tmpbuf[1]; - int ioc, ret; - - /* Peek at the next message */ - iov[0].iov_base = &tmpbuf; - iov[0].iov_len = sizeof(tmpbuf); - - memset(&msg, 0, sizeof(msg)); - msg.msg_iov = iov; - msg.msg_iovlen = 1; - msg.msg_name = &srx; - msg.msg_namelen = sizeof(srx); - msg.msg_control = control; - msg.msg_controllen = sizeof(control); - msg.msg_flags = 0; - - ret = recvmsg(z_conn->fd, &msg, MSG_PEEK | (nowait ? MSG_DONTWAIT : 0)); - debug("RECVMSG PEEK: %d\n", ret); - if (ret == -1) - return -1; - - /* Find the call ID. */ - call = NULL; - for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) { - unsigned char *p; - - if (cmsg->cmsg_level != SOL_RXRPC || - cmsg->cmsg_type != RXRPC_USER_CALL_ID) - continue; - - p = CMSG_DATA(cmsg); - call = *(struct rx_call **)p; - break; - } - if (!call) - abort(); - - /* Now actually retrieve that message */ - memset(&msg, 0, sizeof(msg)); - msg.msg_iov = iov; - msg.msg_iovlen = 0; - msg.msg_name = &srx; - msg.msg_namelen = sizeof(srx); - msg.msg_control = control; - msg.msg_controllen = sizeof(control); - msg.msg_flags = 0; + enum kafs_recv_condition cond; + uint32_t supplementary_info; + int i, ioc, ret; debug("Recv: buf[0]=%p data[0]=%p (io=%u)\n", call->buffer_tail, call->buffer_tail->buf, call->buffer_tail->io_cursor); @@ -538,73 +468,69 @@ int rxrpc_recv_data(struct rx_connection *z_conn, bool nowait) bufs[0] = call->buffer_tail; iov[0].iov_base = bufs[0]->buf + bufs[0]->io_cursor; iov[0].iov_len = RXGEN_BUFFER_SIZE - bufs[0]->io_cursor; - msg.msg_iovlen = 1; + ioc = 1; } else { - msg.msg_iovlen = 0; + ioc = 0; } - for (ioc = msg.msg_iovlen; ioc < 4; ioc++) { - bufs[ioc] = calloc(1, sizeof(struct rx_buf)); - bufs[ioc]->magic = RXGEN_BUF_MAGIC; - if (!bufs[ioc]) { - while (--ioc >= (int)msg.msg_iovlen) { - bufs[ioc]->magic = RXGEN_BUF_DEAD; - free(bufs[ioc]->buf); - free(bufs[ioc]); + for (i = ioc; i < 4; i++) { + bufs[i] = calloc(1, sizeof(struct rx_buf)); + bufs[i]->magic = RXGEN_BUF_MAGIC; + if (!bufs[i]) { + while (--i >= ioc) { + bufs[i]->magic = RXGEN_BUF_DEAD; + free(bufs[i]->buf); + free(bufs[i]); } - return -1; + return; } - bufs[ioc]->buf = malloc(RXGEN_BUFFER_SIZE); - if (!bufs[ioc]->buf) { - bufs[ioc]->magic = RXGEN_BUF_DEAD; - free(bufs[ioc]); - while (--ioc >= (int)msg.msg_iovlen) { - bufs[ioc]->magic = RXGEN_BUF_DEAD; - free(bufs[ioc]->buf); - free(bufs[ioc]); + bufs[i]->buf = malloc(RXGEN_BUFFER_SIZE); + if (!bufs[i]->buf) { + bufs[i]->magic = RXGEN_BUF_DEAD; + free(bufs[i]); + while (--i >= ioc) { + bufs[i]->magic = RXGEN_BUF_DEAD; + free(bufs[i]->buf); + free(bufs[i]); } - return -1; + return; } - iov[ioc].iov_base = bufs[ioc]->buf; - iov[ioc].iov_len = RXGEN_BUFFER_SIZE; + iov[i].iov_base = bufs[i]->buf; + iov[i].iov_len = RXGEN_BUFFER_SIZE; } - msg.msg_iovlen = 4; + ioc = 4; - ret = recvmsg(z_conn->fd, &msg, 0); - debug("RECVMSG: %d\n", ret); + ret = transport->recv_data(call->handle, iov, ioc, &cond, &supplementary_info); + debug("RECVDATA: %d\n", ret); if (ret == -1) - return -1; - - debug("RECV: %d [fl:%x]\n", ret, msg.msg_flags); - debug("CMSG: %zu\n", msg.msg_controllen); - debug("IOV: %zu [0]=%zu\n", msg.msg_iovlen, iov[0].iov_len); + return; call->bytes_received += ret; /* Attach any used buffers to the call and discard the rest */ if (ret > 0) { - for (ioc = 0; ioc < 4 && ret > 0; ioc++) { - unsigned added = RXGEN_BUFFER_SIZE - bufs[ioc]->io_cursor; - debug("xfer[%d] space=%u rem=%d\n", ioc, added, ret); + for (i = 0; i < 4 && ret > 0; i++) { + unsigned added = RXGEN_BUFFER_SIZE - bufs[i]->io_cursor; + debug("xfer[%d] space=%u rem=%d\n", i, added, ret); if (added > (unsigned)ret) added = (unsigned)ret; - bufs[ioc]->io_cursor += added; + bufs[i]->io_cursor += added; call->data_count += added; ret -= added; - if (call->buffer_tail == bufs[ioc]) + if (call->buffer_tail == bufs[i]) continue; - call->buffer_tail->next = bufs[ioc]; - call->buffer_tail = bufs[ioc]; + call->buffer_tail->next = bufs[i]; + call->buffer_tail = bufs[i]; if (ret <= 0) { - ioc++; + i++; break; } } - for (; ioc < 4; ioc++) { - bufs[ioc]->magic = RXGEN_BUF_DEAD; - free(bufs[ioc]->buf); - free(bufs[ioc]); + for (; i < 4; i++) { + bufs[i]->magic = RXGEN_BUF_DEAD; + free(bufs[i]->buf); + free(bufs[i]); } } @@ -614,75 +540,52 @@ int rxrpc_recv_data(struct rx_connection *z_conn, bool nowait) debug("RecvQ buf=%p data=%p iocur=%u\n", cursor, cursor->buf, cursor->io_cursor); - /* Process the metadata */ - if (msg.msg_flags & MSG_EOR) + /* Process the call state given to us by the receiver. */ + switch (cond) { + case kafs_recv_more: + break; + case kafs_recv_final_ack: + if (call->state == rx_call_sv_waiting_for_final_ack) { + call->state = rx_call_sv_complete; + } else { + fprintf(stderr, "RxRPC: Recv-Ack in bad call state (%d)\n", + call->state); + abort(); + } + case kafs_recv_complete: call->known_to_kernel = 0; - if (!(msg.msg_flags & MSG_MORE)) + case kafs_recv_complete_now_send: call->more_recv = false; + break; - for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) { - unsigned char *p; - int n; - - if (cmsg->cmsg_level != SOL_RXRPC) - continue; - - n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg)); - p = CMSG_DATA(cmsg); - - switch (cmsg->cmsg_type) { - case RXRPC_ABORT: - if (n != sizeof(call->abort_code)) - call->abort_code = 0; - else - memcpy(&call->abort_code, p, sizeof(call->abort_code)); - z_conn->last_abort_code = call->abort_code; - call->error_code = ECONNABORTED; - call->state = rx_call_remotely_aborted; - break; - - case RXRPC_NET_ERROR: - if (n != sizeof(ret)) { - errno = EBADMSG; - return -1; - } - memcpy(&ret, p, sizeof(ret)); - call->error_code = ret; - call->state = rx_call_net_error; - break; - - case RXRPC_LOCAL_ERROR: - if (n != sizeof(ret)) { - errno = EBADMSG; - return -1; - } - memcpy(&ret, p, sizeof(ret)); - call->error_code = ret; - call->state = rx_call_local_error; - break; - - case RXRPC_BUSY: - call->error_code = ECONNREFUSED; - call->state = rx_call_rejected_busy; - break; - - case RXRPC_ACK: - if (call->state == rx_call_sv_waiting_for_final_ack) { - call->state = rx_call_sv_complete; - } else { - fprintf(stderr, "RxRPC: Recv-Ack in bad call state (%d)\n", - call->state); - abort(); - } - break; - - case RXRPC_RESPONSE: - call->secured = 1; - break; - - default: - break; - } + case kafs_recv_abort: + call->conn->last_abort_code = supplementary_info; + call->abort_code = supplementary_info; + call->error_code = ECONNABORTED; + call->state = rx_call_remotely_aborted; + call->known_to_kernel = 0; + call->more_recv = false; + break; + case kafs_recv_net_error: + call->error_code = supplementary_info; + call->state = rx_call_net_error; + call->known_to_kernel = 0; + call->more_recv = false; + break; + case kafs_recv_local_error: + call->error_code = supplementary_info; + call->state = rx_call_local_error; + call->known_to_kernel = 0; + call->more_recv = false; + break; + case kafs_recv_busy: + call->error_code = ECONNREFUSED; + call->state = rx_call_rejected_busy; + call->known_to_kernel = 0; + call->more_recv = false; + break; + default: + abort(); } /* Switch into appropriate decode state */ @@ -699,7 +602,7 @@ loop: * and is willing to settle for nothing also. */ if (call->data_count <= 0 && call->more_recv) - return 0; + return; } else if (call->data_count < call->need_size) { if (!call->more_recv) { /* Short data */ @@ -707,7 +610,7 @@ loop: call->error_code = ENODATA; rxrpc_abort_call(call, 1); } - return 0; + return; } /* Handle reception of the opcode in a server-side call. Note @@ -721,6 +624,7 @@ loop: goto loop; } + debug("DEC %u > %u\n", call->data_count, call->need_size); ret = call->decoder(call); if (ret < 0) { rxrpc_abort_call(call, 1); @@ -746,7 +650,7 @@ loop: /* Invoke the processor */ call->processor(call); - return 0; + return; } } break; @@ -757,14 +661,12 @@ loop: case rx_call_sv_processing: call->failed(call); - return 0; + return; default: fprintf(stderr, "RxRPC: Recv in bad call state (%d)\n", call->state); abort(); } - - return 0; } /* @@ -772,25 +674,9 @@ loop: */ void rxrpc_abort_call(struct rx_call *call, uint32_t abort_code) { - struct msghdr msg; - size_t ctrllen; - unsigned char control[128]; if (call->known_to_kernel) { - memset(control, 0, sizeof(control)); - ctrllen = 0; - RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)call); - RXRPC_ADD_ABORT(control, ctrllen, abort_code); - - msg.msg_name = &call->conn->peer; - msg.msg_namelen = sizeof(call->conn->peer); - msg.msg_iov = NULL; - msg.msg_iovlen = 0; - msg.msg_control = control; - msg.msg_controllen = ctrllen; - msg.msg_flags = 0; - - sendmsg(call->conn->fd, &msg, 0); + transport->abort_call(call->handle, abort_code); call->known_to_kernel = 0; } call->state = rx_call_locally_aborted; @@ -806,6 +692,7 @@ void rxrpc_terminate_call(struct rx_call *call, uint32_t abort_code) rxrpc_check_call(call); if (call->known_to_kernel) rxrpc_abort_call(call, abort_code); + transport->terminate_call(call->handle); call->magic = 0x7a7b7c7d; for (cursor = call->buffer_head; cursor; cursor = next) { rxrpc_check_buf(cursor); @@ -819,26 +706,26 @@ void rxrpc_terminate_call(struct rx_call *call, uint32_t abort_code) free(call); } +/* + * Poll the transport and receive data if there's anything available. + */ +int rxrpc_maybe_recv_data(struct rx_connection *conn) +{ + return transport->poll_connection(conn->handle, true); +} + /* * Run a single call synchronously. */ int rxrpc_run_sync_call(struct rx_call *call) { - struct pollfd fds[1]; + int ret; while (call->known_to_kernel) { - fds[0].fd = call->conn->fd; - fds[0].events = POLLIN; - fds[0].revents = 0; - - if (poll(fds, 1, -1) == -1) { - fprintf(stderr, "Poll failed: %m\n"); - return -1; - } - if (rxrpc_recv_data(call->conn, false)) { - fprintf(stderr, "rxrpc_recv_data failed: %m\n"); - return -1; - } + debug("Run call %d\n", call->known_to_kernel); + ret = transport->poll_connection(call->conn->handle, false); + if (ret < 0) + return ret; } switch (call->state) { @@ -1002,15 +889,25 @@ void rxrpc_dec_advance_buffer(struct rx_call *call) */ cursor = call->buffer_head; segment = cursor->io_cursor; + debug("DEC Advance %u\n", segment); new_stop = cursor->buf + segment; if (call->data_stop >= new_stop) { /* This buffer must then be completely used as we're required to check * amount received before reading it. */ - if (new_stop != cursor->buf + RXGEN_BUFFER_SIZE) - abort(); /* Didn't completely consume a buffer */ - if (call->buffer_tail == cursor) - abort(); /* Unexpectedly out of data */ + if (call->data_stop > new_stop) { + debug("buffer overrun (%p > %p)\n", call->data_stop, new_stop); + abort(); + } + if (new_stop != cursor->buf + cursor->io_cursor) { + debug("buffer not consumed (%p != %p)\n", + new_stop, cursor->buf + cursor->io_cursor); + abort(); + } + if (call->buffer_tail == cursor) { + debug("unexpectedly out of data\n"); + abort(); + } /* Move to the next buffer */ rxrpc_post_dec(call); diff --git a/kafs/transport.h b/kafs/transport.h new file mode 100644 index 0000000..87abdf8 --- /dev/null +++ b/kafs/transport.h @@ -0,0 +1,124 @@ +/* Access points into the dynamically loaded Rx transport module + * + * Copyright (C) 2015 Red Hat, Inc. All Rights Reserved. + * Written by David Howells (dhowells@redhat.com) + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public Licence + * as published by the Free Software Foundation; either version + * 2 of the Licence, or (at your option) any later version. + */ + +#ifndef _KAFS_TRANSPORT_H +#define _KAFS_TRANSPORT_H + +#include +#include +#include + +struct kafs_transport_handle; +struct kafs_security_handle; +struct kafs_connection_handle; +struct kafs_call_handle; + +enum kafs_connection_auth_level { + kafs_connection_no_auth, + kafs_connection_local_auth, + kafs_connection_clear, + kafs_connection_integrity_only, + kafs_connection_encrypt, +}; + +enum kafs_recv_condition { + kafs_recv_more, + kafs_recv_final_ack, + kafs_recv_complete, + kafs_recv_complete_now_send, + kafs_recv_abort, + kafs_recv_net_error, + kafs_recv_local_error, + kafs_recv_busy, +}; + +#define kafs_transport_id "kAFS-utils tran" + +struct kafs_transport_handle; +struct kafs_security_handle; +struct kafs_connection_handle; +struct kafs_service_handle; +struct kafs_call_handle; + +typedef void (*kafs_incoming_call_func)(struct kafs_service_handle *service, + void *caller_data, + const struct sockaddr *peer, + socklen_t *peerlen); + +typedef void (*kafs_data_ready_func)(struct kafs_call_handle *call, + void *caller_data); + +struct kafs_transport { + char id[16]; /* = kafs_transport_id */ + char name[16]; /* Name of transport */ + + struct kafs_transport_handle *(*init_transport)(const struct sockaddr *sa, + socklen_t salen); + + void (*free_transport)(struct kafs_transport_handle *transport); + + struct kafs_security_handle *(*new_security)(struct kafs_transport_handle *transport, + const char *cell_name, + enum kafs_connection_auth_level level); + + void (*free_security)(struct kafs_security_handle *security); + + struct kafs_connection_handle *(*open_connection)(struct kafs_transport_handle *transport, + struct kafs_security_handle *security, + const struct sockaddr *sa, + socklen_t salen, + uint16_t service, + int exclusive_connection); + + void (*close_connection)(struct kafs_connection_handle *connection); + + struct kafs_call_handle *(*make_call)(struct kafs_connection_handle *connection, + kafs_data_ready_func data_ready_func, + void *caller_data); + + struct kafs_service_handle *(*new_service)(struct kafs_transport_handle *transport, + kafs_incoming_call_func new_call_func, + void *caller_data, + uint16_t local_port, + uint16_t local_service, + struct kafs_security_handle **permits); + + void (*shutdown_service)(struct kafs_service_handle *service); + + struct kafs_call_handle *(*accept_call)(struct kafs_service_handle *service, + kafs_data_ready_func data_ready_func, + void *caller_data, + struct sockaddr *_sa, + socklen_t *_salen); + + void (*abort_call)(struct kafs_call_handle *call, + uint32_t abort_code); + + void (*terminate_call)(struct kafs_call_handle *call); + + int (*send_data)(struct kafs_call_handle *call, + struct iovec *iov, + int ioc, + int more); + + int (*recv_data)(struct kafs_call_handle *call, + struct iovec *iov, + int ioc, + enum kafs_recv_condition *_cond, + uint32_t *_supplementary_info); + + int (*poll_connection)(struct kafs_connection_handle *conn, + int nowait); +}; + +extern struct kafs_transport kafs_transport; + +#endif /* _KAFS_TRANSPORT_H */ diff --git a/rxgen/emit_c_sync_funcs.py b/rxgen/emit_c_sync_funcs.py index b386bc9..c362f6e 100644 --- a/rxgen/emit_c_sync_funcs.py +++ b/rxgen/emit_c_sync_funcs.py @@ -481,7 +481,7 @@ def emit_func_send(o, func, what): # Allocate call if what == "request": o.rxsrc("\n") - o.rxsrc("\tcall = rxrpc_alloc_call(z_conn, 0);\n") + o.rxsrc("\tcall = rxrpc_make_call(z_conn);\n") o.rxsrc("\tif (!call)\n") o.rxsrc("\t\treturn ", bad_ret, ";\n") o.rxsrc("\tcall->decoder = rxgen_decode_", func.name, "_response;\n") diff --git a/rxgen/emit_py_module.py b/rxgen/emit_py_module.py index aa669b1..af63a6e 100644 --- a/rxgen/emit_py_module.py +++ b/rxgen/emit_py_module.py @@ -72,6 +72,8 @@ def emit_py_module(o): o.pysrc(" */\n") o.pysrc("static PyMethodDef module_methods[] = {\n") + o.pysrc("\t{\"load_kafs_transport\", (PyCFunction)kafs_py_load_kafs_transport, METH_VARARGS, \"\" },\n") + o.pysrc("\t{\"rx_new_transport\", (PyCFunction)kafs_py_rx_new_transport, METH_VARARGS, \"\" },\n") o.pysrc("\t{\"rx_new_connection\", (PyCFunction)kafs_py_rx_new_connection, METH_VARARGS, \"\" },\n") o.pysrc("\t{\"afs_string_to_key\", (PyCFunction)kafs_py_string_to_key, METH_VARARGS, \"\" },\n") @@ -104,6 +106,7 @@ def emit_py_module(o): # Load types if o.xdr.py_type_defs: o.pysrc("\tif (") + o.pysrc("PyType_Ready(&py_rx_transportType) < 0 ||\n\t ") o.pysrc("PyType_Ready(&py_rx_connectionType) < 0 ||\n\t ") o.pysrc("PyType_Ready(&py_rx_split_infoType) < 0") for pyt in o.xdr.py_type_defs: diff --git a/rxgen/emit_py_sync_funcs.py b/rxgen/emit_py_sync_funcs.py index 61a7244..902897a 100644 --- a/rxgen/emit_py_sync_funcs.py +++ b/rxgen/emit_py_sync_funcs.py @@ -342,7 +342,7 @@ def emit_py_func_simple_sync_call(o, func): o.pysrc("\t\treturn NULL;\n") o.pysrc("\n") - o.pysrc("\tcall = rxrpc_alloc_call(z_conn->x, 0);\n") + o.pysrc("\tcall = rxrpc_make_call(z_conn->x);\n") o.pysrc("\tif (!call) {\n") if func.split: o.pysrc("\t\tPy_XDECREF(split_info);\n"); diff --git a/setup.py b/setup.py index edc5a34..db0e061 100644 --- a/setup.py +++ b/setup.py @@ -47,9 +47,10 @@ setup(name = "kafs", "kafs/rxgen_afs_py.c", "kafs/py_passwd.c", "kafs/py_rxgen.c", + "kafs/py_rxtrans.c", "kafs/py_rxconn.c", "kafs/py_rxsplit.c", - "kafs/af_rxrpc.c" + "kafs/transport.c" ], extra_compile_args = [ "-O0", -- 2.50.1