-build:
+run:
python3 setup.py build
rpm:
--- /dev/null
+kafs_af_rxrpc.so: af_rxrpc.c af_rxrpc.h Makefile ../kafs/transport.h
+ $(CC) -shared -fPIC -O2 -g -Wall -o $@ af_rxrpc.c -I../kafs
--- /dev/null
+/* AF_RXRPC driver
+ *
+ * Copyright (C) 2014 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#define _XOPEN_SOURCE
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <poll.h>
+#include <errno.h>
+#include <limits.h>
+#include <sys/socket.h>
+#include "transport.h"
+#include "af_rxrpc.h"
+
+#define RXGEN_CALL_MAGIC 0x52584745U
+#define RXGEN_BUF_MAGIC (0x52420000U | __LINE__)
+#define RXGEN_BUF_DEAD (0x6b6bU | __LINE__)
+
+#define debug(fmt, ...) do { if (0) printf("AFRX: "fmt, ## __VA_ARGS__); } while (0)
+
+uint32_t rxgen_dec_padding_sink;
+
+struct usage {
+ int count;
+};
+
+static inline void usage_set(struct usage *usage)
+{
+ usage->count = 1;
+}
+
+static inline void usage_get(struct usage *usage)
+{
+ usage->count++;
+}
+
+static inline int usage_put_and_test(struct usage *usage)
+{
+ return --usage->count == 0;
+}
+
+struct kafs_transport_handle {
+ struct usage usage;
+ struct sockaddr_rxrpc local;
+};
+
+struct kafs_security_handle {
+ struct usage usage;
+ struct kafs_transport_handle *transport;
+ enum kafs_connection_auth_level level;
+ unsigned min_sec_level;
+ unsigned cell_name_len;
+ char cell_name[];
+};
+
+struct kafs_connection_handle {
+ struct usage usage;
+ int fd;
+ struct kafs_transport_handle *transport;
+ struct kafs_security_handle *security;
+ struct sockaddr_rxrpc peer;
+};
+
+struct kafs_call_handle {
+ struct usage usage;
+ struct kafs_connection_handle *conn;
+ kafs_data_ready_func data_ready_func;
+ void *caller_data;
+ int known_to_kernel;
+};
+
+/*
+ * dump the control messages
+ */
+static __attribute__((unused))
+void dump_cmsg(struct msghdr *msg)
+{
+ struct cmsghdr *cmsg;
+ unsigned long user_id;
+ unsigned char *p;
+ int abort_code;
+ int n;
+
+ for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
+ n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg));
+ p = CMSG_DATA(cmsg);
+
+ printf("CMSG: %zu: ", cmsg->cmsg_len);
+
+ if (cmsg->cmsg_level == SOL_RXRPC) {
+ switch (cmsg->cmsg_type) {
+ case RXRPC_USER_CALL_ID:
+ printf("RXRPC_USER_CALL_ID: ");
+ if (n != sizeof(user_id))
+ goto dump_data;
+ memcpy(&user_id, p, sizeof(user_id));
+ printf("%lx\n", user_id);
+ continue;
+
+ case RXRPC_ABORT:
+ printf("RXRPC_ABORT: ");
+ if (n != sizeof(abort_code))
+ goto dump_data;
+ memcpy(&abort_code, p, sizeof(abort_code));
+ printf("%d\n", abort_code);
+ continue;
+
+ case RXRPC_ACK:
+ printf("RXRPC_ACK");
+ if (n != 0)
+ goto dump_data_colon;
+ goto print_nl;
+
+ case RXRPC_RESPONSE:
+ printf("RXRPC_RESPONSE");
+ if (n != 0)
+ goto dump_data_colon;
+ goto print_nl;
+
+ case RXRPC_NET_ERROR:
+ printf("RXRPC_NET_ERROR: ");
+ if (n != sizeof(abort_code))
+ goto dump_data;
+ memcpy(&abort_code, p, sizeof(abort_code));
+ printf("%s\n", strerror(abort_code));
+ continue;
+
+ case RXRPC_BUSY:
+ printf("RXRPC_BUSY");
+ if (n != 0)
+ goto dump_data_colon;
+ goto print_nl;
+
+ case RXRPC_LOCAL_ERROR:
+ printf("RXRPC_LOCAL_ERROR: ");
+ if (n != sizeof(abort_code))
+ goto dump_data;
+ memcpy(&abort_code, p, sizeof(abort_code));
+ printf("%s\n", strerror(abort_code));
+ continue;
+
+ default:
+ break;
+ }
+ }
+
+ printf("l=%d t=%d", cmsg->cmsg_level, cmsg->cmsg_type);
+
+ dump_data_colon:
+ printf(": ");
+ dump_data:
+ printf("{");
+ for (; n > 0; n--, p++)
+ printf("%02x", *p);
+
+ print_nl:
+ printf("}\n");
+ }
+}
+
+/*
+ * Initialise the transport module
+ */
+static struct kafs_transport_handle *af_rxrpc_init_transport(const struct sockaddr *sa,
+ socklen_t salen)
+{
+ struct kafs_transport_handle *trans;
+ switch (sa->sa_family) {
+ case AF_INET:
+ case AF_INET6:
+ break;
+ default:
+ fprintf(stderr, "Unsupported endpoint type\n");
+ errno = EPROTOTYPE;
+ return NULL;
+ }
+
+ trans = calloc(1, sizeof(*trans));
+ if (!trans)
+ return NULL;
+
+ usage_set(&trans->usage);
+ trans->local.srx_family = AF_RXRPC;
+ trans->local.srx_service = 0;
+ trans->local.transport_type = SOCK_DGRAM;
+ trans->local.transport_len = salen;
+ memcpy(&trans->local.transport, sa, salen);
+
+ return trans;
+}
+
+/*
+ * Free the transport module
+ */
+static void af_rxrpc_free_transport(struct kafs_transport_handle *trans)
+{
+ if (usage_put_and_test(&trans->usage))
+ free(trans);
+}
+
+/*
+ * Set up a security descriptor
+ */
+static struct kafs_security_handle *af_rxrpc_new_security(
+ struct kafs_transport_handle *transport,
+ const char *cell_name,
+ enum kafs_connection_auth_level level)
+{
+ struct kafs_security_handle *security;
+ size_t namelen = strlen(cell_name);
+
+ security = calloc(1, sizeof(*security) + namelen + 1);
+ if (!security)
+ return NULL;
+
+ usage_set(&security->usage);
+ security->transport = transport;
+ security->cell_name_len = namelen;
+ security->level = level;
+ switch (level) {
+ case kafs_connection_no_auth:
+ break;
+ case kafs_connection_local_auth:
+ fprintf(stderr, "Local auth not supported by AF_RXRPC transport\n");
+ errno = -EINVAL;
+ free(security);
+ return NULL;
+ case kafs_connection_clear:
+ security->min_sec_level = RXRPC_SECURITY_PLAIN;
+ break;
+ case kafs_connection_integrity_only:
+ security->min_sec_level = RXRPC_SECURITY_AUTH;
+ break;
+ case kafs_connection_encrypt:
+ security->min_sec_level = RXRPC_SECURITY_ENCRYPT;
+ break;
+ }
+ memcpy(security->cell_name, cell_name, namelen + 1);
+ usage_get(&security->transport->usage);
+ return security;
+}
+
+/*
+ * Free a security descriptor
+ */
+static void af_rxrpc_free_security(struct kafs_security_handle *security)
+{
+ if (usage_put_and_test(&security->usage)) {
+ af_rxrpc_free_transport(security->transport);
+ free(security);
+ }
+}
+
+/*
+ * Set up a new connection
+ */
+static struct kafs_connection_handle *af_rxrpc_open_connection(
+ struct kafs_transport_handle *transport,
+ struct kafs_security_handle *security,
+ const struct sockaddr *sa,
+ socklen_t salen,
+ uint16_t service,
+ int exclusive_connection)
+{
+ struct kafs_connection_handle *conn;
+ int ret;
+
+ if (salen > sizeof(conn->peer.transport)) {
+ errno = EINVAL;
+ return NULL;
+ }
+
+ conn = calloc(1, sizeof(*conn));
+ if (!conn)
+ return NULL;
+ usage_set(&conn->usage);
+ conn->transport = transport;
+ conn->security = security;
+
+ conn->peer.srx_family = AF_RXRPC;
+ conn->peer.srx_service = service;
+ conn->peer.transport_type = SOCK_DGRAM;
+ conn->peer.transport_len = salen;
+ memcpy(&conn->peer.transport, sa, salen);
+
+ /* Open up a socket for talking to the AF_RXRPC module */
+ conn->fd = socket(AF_RXRPC, SOCK_DGRAM, PF_INET);
+ if (conn->fd < 0)
+ goto error_conn;
+
+ if (exclusive_connection) {
+ ret = setsockopt(conn->fd,
+ SOL_RXRPC, RXRPC_EXCLUSIVE_CONNECTION,
+ NULL, 0);
+ if (ret == -1)
+ goto error_conn;
+ }
+
+ if (security) {
+ ret = setsockopt(conn->fd, SOL_RXRPC, RXRPC_MIN_SECURITY_LEVEL,
+ &security->min_sec_level,
+ sizeof(security->min_sec_level));
+ if (ret == -1)
+ goto error_conn;
+
+ ret = setsockopt(conn->fd, SOL_RXRPC, RXRPC_SECURITY_KEY,
+ security->cell_name, security->cell_name_len);
+ if (ret == -1)
+ goto error_conn;
+ }
+
+ /* Bind an address to the local endpoint */
+ ret = bind(conn->fd, (struct sockaddr *)&conn->transport->local,
+ sizeof(conn->transport->local));
+ if (ret < 0)
+ goto error_fd;
+
+ usage_get(&security->usage);
+ usage_get(&transport->usage);
+ return conn;
+
+error_fd:
+ close(conn->fd);
+error_conn:
+ free(conn);
+ return NULL;
+}
+
+/*
+ * Close an RxRPC client connection. This will cause all outstanding
+ * operations to be aborted by the kernel..
+ */
+static void af_rxrpc_close_connection(struct kafs_connection_handle *conn)
+{
+ if (usage_put_and_test(&conn->usage)) {
+ close(conn->fd);
+ af_rxrpc_free_security(conn->security);
+ af_rxrpc_free_transport(conn->transport);
+ free(conn);
+ }
+}
+
+/*
+ * Allocate a call handle for a new call.
+ */
+static struct kafs_call_handle *af_rxrpc_make_call(
+ struct kafs_connection_handle *conn,
+ kafs_data_ready_func data_ready_func,
+ void *caller_data)
+{
+ struct kafs_call_handle *call;
+
+ call = calloc(1, sizeof(*call));
+ if (!call)
+ return NULL;
+
+ usage_set(&call->usage);
+ call->conn = conn;
+ call->data_ready_func = data_ready_func;
+ call->caller_data = caller_data;
+ usage_get(&conn->usage);
+ return call;
+}
+
+/*
+ * Send buffered data.
+ */
+static int af_rxrpc_send_data(struct kafs_call_handle *call,
+ struct iovec *iov,
+ int ioc,
+ int more)
+{
+ struct msghdr msg;
+ size_t ctrllen;
+ unsigned char control[128];
+ int ret, i;
+
+ /* Request an operation */
+ ctrllen = 0;
+ RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)call);
+
+ msg.msg_name = &call->conn->peer;
+ msg.msg_namelen = sizeof(call->conn->peer);
+ msg.msg_iov = iov;
+ msg.msg_iovlen = ioc;
+ msg.msg_control = control;
+ msg.msg_controllen = ctrllen;
+ msg.msg_flags = 0;
+
+ if (more)
+ more = MSG_MORE;
+
+ //dump_cmsg(&msg);
+ for (i = 0; i < msg.msg_iovlen; i++)
+ debug("IOV[%02u] %04zu %p\n",
+ i, msg.msg_iov[i].iov_len, msg.msg_iov[i].iov_base);
+
+ /* Send the data */
+ ret = sendmsg(call->conn->fd, &msg, more);
+ debug("SENDMSG: %d%s\n", ret, more ? " [more]" : "");
+ if (ret >= 0)
+ call->known_to_kernel = 1;
+ return ret;
+}
+
+/*
+ * Receive data from a connection and pass it to the appropriate call.
+ */
+static int af_rxrpc_recv_mux(struct kafs_connection_handle *conn)
+{
+ struct kafs_call_handle *call;
+ struct sockaddr_rxrpc srx;
+ struct cmsghdr *cmsg;
+ struct msghdr msg;
+ struct iovec iov[1];
+ unsigned char control[128];
+ uint32_t tmpbuf[1];
+ int ret;
+
+ for (;;) {
+ /* Peek at the next message */
+ iov[0].iov_base = &tmpbuf;
+ iov[0].iov_len = sizeof(tmpbuf);
+
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_iov = iov;
+ msg.msg_iovlen = 1;
+ msg.msg_name = &srx;
+ msg.msg_namelen = sizeof(srx);
+ msg.msg_control = control;
+ msg.msg_controllen = sizeof(control);
+ msg.msg_flags = 0;
+
+ ret = recvmsg(conn->fd, &msg, MSG_PEEK | MSG_DONTWAIT);
+ debug("RECVMSG PEEK: %d\n", ret);
+ if (ret == -1) {
+ if (errno == EAGAIN ||
+ errno == ENODATA)
+ return 0;
+ return -1;
+ }
+ //dump_cmsg(&msg);
+
+ /* Find the call ID. */
+ call = NULL;
+ for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+ unsigned char *p;
+
+ if (cmsg->cmsg_level != SOL_RXRPC ||
+ cmsg->cmsg_type != RXRPC_USER_CALL_ID)
+ continue;
+
+ p = CMSG_DATA(cmsg);
+ call = *(struct kafs_call_handle **)p;
+ goto found_call;
+ }
+ abort();
+
+ found_call:
+ if (!call->known_to_kernel) {
+ fprintf(stderr, "Unexpected packet for dead call\n");
+ abort();
+ }
+ call->data_ready_func(call, call->caller_data);
+ }
+}
+
+/*
+ * Receive data from a transport.
+ */
+static int af_rxrpc_recv_data(struct kafs_call_handle *call,
+ struct iovec *iov,
+ int ioc,
+ enum kafs_recv_condition *_cond,
+ uint32_t *_supplementary_info)
+{
+ struct sockaddr_rxrpc srx;
+ struct cmsghdr *cmsg;
+ struct msghdr msg;
+ unsigned char control[128];
+ enum kafs_recv_condition cond;
+ uint32_t supplementary_info;
+ int ret;
+
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_iov = iov;
+ msg.msg_iovlen = ioc;
+ msg.msg_name = &srx;
+ msg.msg_namelen = sizeof(srx);
+ msg.msg_control = control;
+ msg.msg_controllen = sizeof(control);
+ msg.msg_flags = 0;
+
+ ret = recvmsg(call->conn->fd, &msg, MSG_DONTWAIT);
+ debug("RECVMSG: %d\n", ret);
+ if (ret == -1)
+ return -1;
+
+ debug("RECV: %d [fl:%x]\n", ret, msg.msg_flags);
+ debug("CMSG: %zu\n", msg.msg_controllen);
+ debug("IOV: %zu [0]=%zu\n", msg.msg_iovlen, iov[0].iov_len);
+
+ /* Process the metadata */
+ supplementary_info = 0;
+ if (msg.msg_flags & MSG_MORE)
+ cond = kafs_recv_more;
+ else if (msg.msg_flags & MSG_EOR)
+ cond = kafs_recv_complete;
+ else
+ cond = kafs_recv_complete_now_send;
+
+ if (msg.msg_flags & MSG_EOR) {
+ debug("Detected EOR\n");
+ call->known_to_kernel = 0;
+ }
+
+ for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+ unsigned char *p;
+ int n;
+
+ if (cmsg->cmsg_level != SOL_RXRPC)
+ continue;
+
+ n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg));
+ p = CMSG_DATA(cmsg);
+
+ switch (cmsg->cmsg_type) {
+ case RXRPC_USER_CALL_ID:
+ if (*(struct kafs_call_handle **)p != call) {
+ fprintf(stderr, "Message queue changed from MSG_PEEK\n");
+ abort();
+ }
+ break;
+
+ case RXRPC_ABORT:
+ if (n == 4)
+ memcpy(&supplementary_info, p, 4);
+ errno = ECONNABORTED;
+ cond = kafs_recv_abort;
+ break;
+
+ case RXRPC_NET_ERROR:
+ if (n != sizeof(ret)) {
+ errno = EBADMSG;
+ return -1;
+ }
+ memcpy(&supplementary_info, p, 4);
+ cond = kafs_recv_net_error;
+ break;
+
+ case RXRPC_LOCAL_ERROR:
+ if (n != sizeof(ret)) {
+ errno = EBADMSG;
+ return -1;
+ }
+ memcpy(&supplementary_info, p, 4);
+ cond = kafs_recv_local_error;
+ break;
+
+ case RXRPC_BUSY:
+ cond = kafs_recv_busy;
+ break;
+
+ case RXRPC_ACK:
+ cond = kafs_recv_final_ack;
+ break;
+
+ case RXRPC_RESPONSE:
+ break;
+
+ default:
+ fprintf(stderr, "Unexpected CMSG type %x\n", cmsg->cmsg_type);
+ break;
+ }
+ }
+
+ *_cond = cond;
+ *_supplementary_info = supplementary_info;
+ debug("<--af_rxrpc_recv_data() = %d [c=%u s=%d]\n",
+ ret, cond, supplementary_info);
+ return ret;
+}
+
+/*
+ * Abort a call.
+ */
+static void af_rxrpc_abort_call(struct kafs_call_handle *call, uint32_t abort_code)
+{
+ struct msghdr msg;
+ size_t ctrllen;
+ unsigned char control[128];
+
+ memset(control, 0, sizeof(control));
+ ctrllen = 0;
+ RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)call);
+ RXRPC_ADD_ABORT(control, ctrllen, abort_code);
+
+ msg.msg_name = &call->conn->peer;
+ msg.msg_namelen = sizeof(call->conn->peer);
+ msg.msg_iov = NULL;
+ msg.msg_iovlen = 0;
+ msg.msg_control = control;
+ msg.msg_controllen = ctrllen;
+ msg.msg_flags = 0;
+
+ sendmsg(call->conn->fd, &msg, 0);
+}
+
+/*
+ * Terminate a call.
+ */
+static void af_rxrpc_terminate_call(struct kafs_call_handle *call)
+{
+ if (usage_put_and_test(&call->usage)) {
+ af_rxrpc_close_connection(call->conn);
+ free(call);
+ }
+}
+
+/*
+ * Poll a connection to see if it has data available.
+ */
+static int af_rxrpc_poll_connection(struct kafs_connection_handle *conn, int nowait)
+{
+ struct pollfd fds[1];
+ int ret;
+
+ debug("-->poll()\n");
+
+ fds[0].fd = conn->fd;
+ fds[0].events = POLLIN;
+ fds[0].revents = 0;
+
+ ret = poll(fds, 1, nowait ? 0 : -1);
+
+ if (ret == -1) {
+ fprintf(stderr, "Poll failed: %m\n");
+ return -1;
+ }
+ if (ret > 0 && af_rxrpc_recv_mux(conn)) {
+ fprintf(stderr, "rxrpc_recv_mux failed: %m\n");
+ return -1;
+ }
+
+ return 0;
+}
+
+struct kafs_transport kafs_transport = {
+ .id = kafs_transport_id,
+ .name = "AF_RXRPC",
+ .init_transport = af_rxrpc_init_transport,
+ .free_transport = af_rxrpc_free_transport,
+ .new_security = af_rxrpc_new_security,
+ .free_security = af_rxrpc_free_security,
+ .open_connection = af_rxrpc_open_connection,
+ .close_connection = af_rxrpc_close_connection,
+ .make_call = af_rxrpc_make_call,
+ .new_service = NULL,
+ .shutdown_service = NULL,
+ .accept_call = NULL,
+ .abort_call = af_rxrpc_abort_call,
+ .terminate_call = af_rxrpc_terminate_call,
+ .send_data = af_rxrpc_send_data,
+ .recv_data = af_rxrpc_recv_data,
+ .poll_connection = af_rxrpc_poll_connection,
+};
from kafs.exception import AFSArgumentError, AFSHelpFlag
from kafs.lib.output import set_verbosity
-def get_cell(switch, params):
+def get_cell(switch, params, transport):
from kafs.lib.cell import cell
- return cell(params[0])
+ return cell(transport, params[0])
-def get_bosserver(switch, params):
+def get_bosserver(switch, params, transport):
from kafs.lib.bosserver import bosserver
return bosserver(params[0])
-def get_fileserver(switch, params):
+def get_fileserver(switch, params, transport):
from kafs.lib.fileserver import fileserver
return fileserver(params[0])
-def get_volserver(switch, params):
+def get_volserver(switch, params, transport):
from kafs.lib.volserver import volserver
return volserver(params[0])
-def get_vlservers(switch, params):
+def get_vlservers(switch, params, transport):
from kafs.lib.vlserver import vlserver
servers = []
for i in params:
servers.append(vlserver(params[0]))
return servers
-def get_volume_name(switch, params):
+def get_volume_name(switch, params, transport):
return params[0]
-def get_volume_names(switch, params):
+def get_volume_names(switch, params, transport):
return params
-def get_machine_name(switch, params):
+def get_machine_name(switch, params, transport):
return params[0]
-def get_machine_names(switch, params):
+def get_machine_names(switch, params, transport):
return params
-def get_path_name(switch, params):
+def get_path_name(switch, params, transport):
return params[0]
-def get_path_names(switch, params):
+def get_path_names(switch, params, transport):
return params
-def get_file_name(switch, params):
+def get_file_name(switch, params, transport):
return params[0]
-def get_file_names(switch, params):
+def get_file_names(switch, params, transport):
return params
-def get_partition_id(switch, params):
+def get_partition_id(switch, params, transport):
from kafs.lib.partition import part2id
return part2id(params[0])
-def get_auth(switch, params):
+def get_auth(switch, params, transport):
return params
-def get_uuid(switch, params):
+def get_uuid(switch, params, transport):
from kafs.lib.uuid import str2uuid
return str2uuid(params[0])
-def get_string(switch, params):
+def get_string(switch, params, transport):
return params[0]
-def get_strings(switch, params):
+def get_strings(switch, params, transport):
return params
-def get_dummy(switch, params):
+def get_dummy(switch, params, transport):
return params
-def get_verbose(switch, params):
+def get_verbose(switch, params, transport):
set_verbosity()
return params
raise AFSArgumentError("UID/GID identifier is not numeric")
return int(i)
-def get_id(switch, params):
+def get_id(switch, params, transport):
return do_get_id(params[0])
-def get_ids(switch, params):
+def get_ids(switch, params, transport):
ids = []
for i in params:
ids.append(do_get_id(i))
raise AFSArgumentError("UID identifier is a positive integer")
return int(i)
-def get_uid(switch, params):
+def get_uid(switch, params, transport):
return do_get_uid(params[0])
-def get_uids(switch, params):
+def get_uids(switch, params, transport):
ids = []
for i in params:
ids.append(do_get_uid(i))
raise AFSArgumentError("GID identifier is a positive integer")
return int(i)
-def get_gid(switch, params):
+def get_gid(switch, params, transport):
return do_get_gid(params[0])
-def get_gids(switch, params):
+def get_gids(switch, params, transport):
ids = []
for i in params:
ids.append(do_get_gid(i))
#
###############################################################################
def parse_arguments(args, available_arguments, argument_size_limits,
- cant_combine_arguments):
+ cant_combine_arguments, transport):
result = {}
need_switch = False
i = 0 # Given arguments index
# Call the syntax checker
syntax = match[1]
result["raw." + match[0]] = params
- result[match[0]] = syntax(match[0], params)
+ result[match[0]] = syntax(match[0], params, transport)
# Check for missing required arguments
for j in available_arguments:
help = "Set privacy flags or quota for a Protection Database entry"
-def get_privacy_flags(switch, params):
+def get_privacy_flags(switch, params, transport):
s = params[0]
if len(s) != 5:
raise AFSArgumentError("Privacy flag string must be exactly 5 characters")
class cell:
"""Represents an AFS cell. We hold the cell name here and the list of
VL servers once we've looked it up"""
- def __init__(self, name = None):
+ def __init__(self, transport, name = None):
if name == None:
name = linecache.getline("/etc/afs/ThisCell", 1)
name = name.strip()
verbose("New Cell ", name, "\n")
self.__name = name
+ self.__transport = transport
self.__looked_up = False
self.__vlserver_names = dict()
self.__vlservers = []
for vladdr in self.query_vl_addrs():
verbose("Trying vlserver ", vladdr, "\n")
- z_conn = rpc.rx_new_connection(str(vladdr), rpc.VL_PORT, rpc.VL_SERVICE,
+ z_conn = rpc.rx_new_connection(self.__transport,
+ str(vladdr), rpc.VL_PORT, rpc.VL_SERVICE,
key, security)
try:
ret = rpc.VL_Probe(z_conn)
key, security = self.determine_security(params)
verbose("Trying volserver ", server.addr(), "\n")
- vol_conn = rpc.rx_new_connection(str(server.addr()),
+ vol_conn = rpc.rx_new_connection(self.__transport,
+ str(server.addr()),
rpc.VOLSERVICE_PORT,
rpc.VOLSERVICE_ID,
key, security)
key, security = self.determine_security(params)
verbose("Trying bosserver ", server.addr(), "\n")
- bos_conn = rpc.rx_new_connection(str(server.addr()),
+ bos_conn = rpc.rx_new_connection(self.__transport,
+ str(server.addr()),
rpc.BOSSERVICE_PORT,
rpc.BOSSERVICE_ID,
key, security)
verbose("Trying ptserver ", server, "\n")
- pt_conn = rpc.rx_new_connection(str(server),
+ pt_conn = rpc.rx_new_connection(self.__transport,
+ str(server),
rpc.PR_PORT,
rpc.PR_SERVICE,
key, security)
# Parse the bos setrestart -time argument
#
###############################################################################
-def parse_restart_time(switch, params):
+def parse_restart_time(switch, params, transport):
at = rpc.bozo_netKTime()
time = params[0].strip().lower()
#
-# AFS Toolset command switcher
+# AFS Toolset command switcher
# -*- coding: utf-8 -*-
#
import kafs.commands
from kafs.lib.output import *
from kafs.exception import AFSArgumentError, AFSHelpFlag
+import kafs.rpc as rpc
###############################################################################
#
else:
argument_size_limits = {}
+ # Load the transport module
+ rpc.load_kafs_transport("/usr/local/lib64/kafs_af_rxrpc.so")
+
+ # Set up a transport
+ transport = rpc.rx_new_transport()
+
# Parse the parameters
try:
params = kafs.argparse.parse_arguments(sys.argv[1:],
- command.command_arguments,
- argument_size_limits,
- cant_combine_arguments)
+ command.command_arguments,
+ argument_size_limits,
+ cant_combine_arguments,
+ transport)
except AFSArgumentError as e:
print(prog + ":", e, file=sys.stderr)
sys.exit(2)
# Stick in the default cell if there isn't one
if "cell" not in params:
from kafs.lib.cell import cell
- params["cell"] = cell()
+ params["cell"] = cell(transport)
params["_prog"] = prog
params["_cmdsetmod"] = cmdsetmod
kafs_py_rx_new_connection(PyObject *_self, PyObject *args)
{
struct py_rx_connection *obj;
- struct rx_connection *z_conn;
+ struct py_rx_transport *trans;
+ struct rx_connection *conn;
union {
struct sockaddr sa;
struct sockaddr_in sin;
uint16_t port, service, local_port = 0, local_service = 0;
int exclusive = 0, security = 0;
- if (!PyArg_ParseTuple(args, "sHHzi|HHp",
+ if (!PyArg_ParseTuple(args, "O!sHHzi|HHp",
+ &py_rx_transportType, &trans,
&address, &port, &service, &key, &security,
&local_port, &local_service, &exclusive))
return NULL;
py_rx_connection_init((PyObject *)obj, NULL, NULL);
assert(obj->x == NULL);
- z_conn = rx_new_connection(&sa.sa, salen, service,
- local_port, local_service, exclusive,
- key, security);
- if (!z_conn) {
+ conn = rx_new_connection(trans->x, &sa.sa, salen, service,
+ exclusive, key, security);
+ if (!conn) {
Py_DECREF(obj);
return errno == ENOMEM ? PyExc_MemoryError :
PyErr_SetFromErrno(PyExc_IOError);
}
- obj->x = z_conn;
+ obj->x = conn;
return (PyObject *)obj;
}
needed = call->blob_size - call->blob_offset;
segment = call->data_stop - call->data_cursor;
- debug("DEC STR dc=%u bsize=%u seg=%u\n", call->data_count, needed, segment);
+ debug("DEC STR dc=%u bsize=%u-%u seg=%lu-%lu\n",
+ call->data_count, call->blob_offset, call->blob_size,
+ call->data_cursor - call->data_start, call->data_stop - call->data_start);
if (segment > 0) {
if (segment > needed)
segment = needed;
+ debug("DEC STR - seg %u\n", segment);
if (call->blob != &rxgen_dec_padding_sink) {
for (i = 0; i < segment; i++)
PyUnicode_WRITE(PyUnicode_KIND(str), PyUnicode_DATA(str),
call->blob_decoded += segment;
call->blob_offset += segment;
call->data_cursor += segment;
- if (call->blob_size <= call->blob_offset) {
+ if (call->blob_offset >= call->blob_size) {
+ if (call->blob_offset > call->blob_size) {
+ debug("DEC STR buffer overrun\n");
+ abort();
+ }
if (call->blob != &rxgen_dec_padding_sink && call->padding_size > 0) {
/* Soak up the padding to a 32-bit boundary */
+ debug("DEC STR padding\n");
call->blob = &rxgen_dec_padding_sink;
call->blob_size = call->padding_size;
call->blob_offset = 0;
{
PyObject *str;
- debug("INIT STRING %u\n", call->blob_size);
-
str = PyUnicode_New(call->blob_size, 255);
if (!str)
return -1;
call->blob = str;
call->blob_offset = 0;
call->padding_size = calc_pad_to_4(call->blob_size);
+ debug("INIT STRING %u,%u\n", call->blob_size, call->padding_size);
return 1;
}
#include "rxgen.h"
+struct py_rx_transport {
+ PyObject_HEAD
+ struct kafs_transport_handle *x;
+};
+
struct py_rx_connection {
PyObject_HEAD
struct rx_connection *x;
bool receiving_data;
};
+extern PyTypeObject py_rx_transportType;
extern PyTypeObject py_rx_connectionType;
+extern PyObject *kafs_py_load_kafs_transport(PyObject *, PyObject *);
+extern PyObject *kafs_py_rx_new_transport(PyObject *, PyObject *);
extern PyObject *kafs_py_rx_new_connection(PyObject *, PyObject *);
extern PyObject *kafs_py_string_to_key(PyObject *, PyObject *);
#include <Python.h>
#include "structmember.h"
-#include <poll.h>
#include <arpa/inet.h>
#include <assert.h>
#include "py_rxgen.h"
static int py_rx_split_do_recv(struct rx_call *call,
struct py_rx_split_info *split_info)
{
- struct pollfd fds[1];
-
- fds[0].fd = call->conn->fd;
- fds[0].events = POLLIN;
- fds[0].revents = 0;
- poll(fds, 1, 0);
- if (fds[0].revents & POLLIN)
- rxrpc_recv_data(call->conn, true);
+ rxrpc_maybe_recv_data(call->conn);
return 0;
}
--- /dev/null
+/* Python RxRPC transport container object
+ *
+ * Copyright (C) 2015 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#include <Python.h>
+#include "structmember.h"
+#include <arpa/inet.h>
+#include "py_rxgen.h"
+#include "rxgen.h"
+
+/*
+ * RxRPC transport container.
+ */
+static int
+py_rx_transport_init(PyObject *_self, PyObject *args, PyObject *kwds)
+{
+ struct py_rx_transport *self = (struct py_rx_transport *)_self;
+ self->x = NULL;
+ return 0;
+}
+
+static void
+py_rx_transport_dealloc(struct py_rx_transport *self)
+{
+ if (self->x) {
+ rx_free_transport(self->x);
+ self->x = NULL;
+ }
+ Py_TYPE(self)->tp_free((PyObject *)self);
+}
+
+PyTypeObject py_rx_transportType = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+ "kafs.rx_transport", /*tp_name*/
+ sizeof(struct py_rx_transport), /*tp_basicsize*/
+ 0, /*tp_itemsize*/
+ (destructor)py_rx_transport_dealloc, /*tp_dealloc*/
+ 0, /*tp_print*/
+ 0, /*tp_getattr*/
+ 0, /*tp_setattr*/
+ 0, /*tp_compare*/
+ 0, /*tp_repr*/
+ 0, /*tp_as_number*/
+ 0, /*tp_as_sequence*/
+ 0, /*tp_as_mapping*/
+ 0, /*tp_hash */
+ 0, /*tp_call*/
+ 0, /*tp_str*/
+ 0, /*tp_getattro*/
+ 0, /*tp_setattro*/
+ 0, /*tp_as_buffer*/
+ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /*tp_flags*/
+ "RxRPC transport container", /* tp_doc */
+ 0, /* tp_traverse */
+ 0, /* tp_clear */
+ 0, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ 0, /* tp_iter */
+ 0, /* tp_iternext */
+ 0, /* tp_methods */
+ 0, /* tp_members */
+ 0, /* tp_getset */
+ 0, /* tp_base */
+ 0, /* tp_dict */
+ 0, /* tp_descr_get */
+ 0, /* tp_descr_set */
+ 0, /* tp_dictoffset */
+ py_rx_transport_init, /* tp_init */
+ 0, /* tp_alloc */
+ 0, /* tp_new */
+};
+
+/*
+ * Set up an RxRPC transport.
+ */
+PyObject *
+kafs_py_rx_new_transport(PyObject *_self, PyObject *args)
+{
+ struct py_rx_transport *obj;
+ struct kafs_transport_handle *trans;
+ union {
+ struct sockaddr sa;
+ struct sockaddr_in sin;
+ struct sockaddr_in6 sin6;
+ } sa;
+ const char *local_address = NULL;
+ socklen_t salen;
+ uint16_t local_port = 0;
+
+ if (!PyArg_ParseTuple(args, "|Hs", &local_port, &local_address))
+ return NULL;
+
+ memset(&sa, 0, sizeof(sa));
+ if (!local_address && !local_port) {
+ sa.sin.sin_family = AF_INET;
+ salen = sizeof(sa.sin);
+ } else if (!local_address) {
+ sa.sin.sin_family = AF_INET;
+ sa.sin.sin_port = htons(local_port);
+ salen = sizeof(sa.sin);
+ } else if (inet_pton(AF_INET, local_address, &sa.sin.sin_addr)) {
+ sa.sin.sin_family = AF_INET;
+ sa.sin.sin_port = htons(local_port);
+ salen = sizeof(sa.sin);
+ } else if (inet_pton(AF_INET6, local_address, &sa.sin.sin_addr)) {
+ sa.sin6.sin6_family = AF_INET6;
+ sa.sin6.sin6_port = htons(local_port);
+ salen = sizeof(sa.sin6);
+ } else {
+ return PyErr_Format(PyExc_RuntimeError,
+ "Unsupported network address '%s'", local_address);
+ }
+
+ obj = (struct py_rx_transport *)_PyObject_New(&py_rx_transportType);
+ if (!obj)
+ return PyExc_MemoryError;
+ py_rx_transport_init((PyObject *)obj, NULL, NULL);
+ assert(obj->x == NULL);
+
+ trans = rx_new_transport(&sa.sa, salen);
+ if (!trans) {
+ Py_DECREF(obj);
+ return errno == ENOMEM ? PyExc_MemoryError :
+ PyErr_SetFromErrno(PyExc_IOError);
+ }
+ obj->x = trans;
+ return (PyObject *)obj;
+}
+
+/*
+ * Load a kAFS transport module.
+ */
+PyObject *
+kafs_py_load_kafs_transport(PyObject *_self, PyObject *args)
+{
+ const char *name = NULL;
+ int ret;
+
+ if (!PyArg_ParseTuple(args, "s", &name))
+ return NULL;
+
+ ret = load_kafs_transport_module(name);
+ if (ret < 0)
+ return errno == ENOMEM ? PyExc_MemoryError :
+ PyErr_SetFromErrno(PyExc_IOError);
+ Py_RETURN_TRUE;
+}
#ifndef _RXGEN_H
#define _RXGEN_H
-#include "af_rxrpc.h"
+#include "transport.h"
#include <stdbool.h>
#include <errno.h>
#include <stdlib.h>
+#include <arpa/inet.h>
typedef uint32_t net_xdr_t;
struct rx_connection {
- struct sockaddr_rxrpc peer;
+ struct kafs_connection_handle *handle;
uint32_t last_abort_code;
- int fd;
};
#define RXGEN_BUFFER_SIZE 1024
struct rx_call {
uint32_t magic;
+ struct kafs_call_handle *handle;
struct rx_connection *conn;
+ struct sockaddr sa;
+ socklen_t sa_len;
enum rx_call_state state;
unsigned known_to_kernel : 1;
unsigned secured : 1;
extern void rxrpc_dec_advance_buffer(struct rx_call *call);
extern int rxgen_dec_discard_excess(struct rx_call *call);
-extern struct rx_connection *rx_new_connection(const struct sockaddr *sa,
+extern int load_kafs_transport_module(const char *name);
+
+extern struct kafs_transport_handle *rx_new_transport(const struct sockaddr *local_sa,
+ socklen_t local_salen);
+extern void rx_free_transport(struct kafs_transport_handle *trans);
+
+extern struct rx_connection *rx_new_connection(struct kafs_transport_handle *trans,
+ const struct sockaddr *sa,
socklen_t salen,
uint16_t service,
- uint16_t local_port,
- uint16_t local_service,
int exclusive,
- const char *key,
- int security);
+ const char *cellname,
+ enum kafs_connection_auth_level level);
-extern void rx_close_connection(struct rx_connection *z_conn);
+extern void rx_close_connection(struct rx_connection *conn);
-extern struct rx_call *rxrpc_alloc_call(struct rx_connection *z_conn, int incoming_call);
+extern struct rx_call *rxrpc_make_call(struct rx_connection *conn);
extern void rxrpc_abort_call(struct rx_call *call, uint32_t abort_code);
extern void rxrpc_terminate_call(struct rx_call *call, uint32_t abort_code);
extern int rxrpc_send_data(struct rx_call *call);
-extern int rxrpc_recv_data(struct rx_connection *z_conn, bool nowait);
+extern int rxrpc_recv_data(struct rx_connection *conn, bool nowait);
+extern int rxrpc_maybe_recv_data(struct rx_connection *conn);
extern int rxrpc_run_sync_call(struct rx_call *call);
#endif /* _RXGEN_H */
-/* AF_RXRPC driver
+/* Dynamically load the Rx transport module
*
- * Copyright (C) 2014 Red Hat, Inc. All Rights Reserved.
+ * Copyright (C) 2015 Red Hat, Inc. All Rights Reserved.
* Written by David Howells (dhowells@redhat.com)
*
* This program is free software; you can redistribute it and/or
* 2 of the Licence, or (at your option) any later version.
*/
-#define _XOPEN_SOURCE
+#include <dlfcn.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-#include <unistd.h>
-#include <poll.h>
#include <errno.h>
#include <limits.h>
-#include <sys/socket.h>
-#include "af_rxrpc.h"
+#include <netinet/in.h>
+#include "transport.h"
#include "rxgen.h"
#define RXGEN_CALL_MAGIC 0x52584745U
#define RXGEN_BUF_MAGIC (0x52420000U | __LINE__)
#define RXGEN_BUF_DEAD (0x6b6bU | __LINE__)
-#define debug(fmt, ...) do { if (0) printf(fmt, ## __VA_ARGS__); } while (0)
+#define debug(fmt, ...) do { if (0) printf("TRAN: "fmt, ## __VA_ARGS__); } while (0)
uint32_t rxgen_dec_padding_sink;
+static struct kafs_transport *transport;
+static char transport_id[] = kafs_transport_id;
+
+static void rxrpc_data_ready(struct kafs_call_handle *call_handle, void *caller_data);
+
/*
- * dump the control messages
+ * Load a transport module and extract the entry points
*/
-static __attribute__((unused))
-void dump_cmsg(struct msghdr *msg)
+int load_kafs_transport_module(const char *name)
{
- struct cmsghdr *cmsg;
- unsigned long user_id;
- unsigned char *p;
- int abort_code;
- int n;
-
- for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
- n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg));
- p = CMSG_DATA(cmsg);
-
- printf("CMSG: %zu: ", cmsg->cmsg_len);
-
- if (cmsg->cmsg_level == SOL_RXRPC) {
- switch (cmsg->cmsg_type) {
- case RXRPC_USER_CALL_ID:
- printf("RXRPC_USER_CALL_ID: ");
- if (n != sizeof(user_id))
- goto dump_data;
- memcpy(&user_id, p, sizeof(user_id));
- printf("%lx\n", user_id);
- continue;
+ void *trans;
+ char *err;
- case RXRPC_ABORT:
- printf("RXRPC_ABORT: ");
- if (n != sizeof(abort_code))
- goto dump_data;
- memcpy(&abort_code, p, sizeof(abort_code));
- printf("%d\n", abort_code);
- continue;
+ trans = dlopen(name, RTLD_LAZY);
+ if (!trans) {
+ err = dlerror();
+ goto error;
+ }
- case RXRPC_ACK:
- printf("RXRPC_ACK");
- if (n != 0)
- goto dump_data_colon;
- goto print_nl;
-
- case RXRPC_RESPONSE:
- printf("RXRPC_RESPONSE");
- if (n != 0)
- goto dump_data_colon;
- goto print_nl;
-
- case RXRPC_NET_ERROR:
- printf("RXRPC_NET_ERROR: ");
- if (n != sizeof(abort_code))
- goto dump_data;
- memcpy(&abort_code, p, sizeof(abort_code));
- printf("%s\n", strerror(abort_code));
- continue;
+ dlerror(); /* clear error */
+ transport = dlsym(trans, "kafs_transport");
+ err = dlerror();
+ if (err)
+ goto error_2;
+
+ if (memcmp(transport->id, transport_id, sizeof(transport_id)) != 0) {
+ fprintf(stderr, "Transport '%s' '%s' %zu\n",
+ transport->id, transport_id, sizeof(transport_id));
+ err = "Transport has incorrect ID string\n";
+ goto error_2;
+ }
+ return 0;
- case RXRPC_BUSY:
- printf("RXRPC_BUSY");
- if (n != 0)
- goto dump_data_colon;
- goto print_nl;
-
- case RXRPC_LOCAL_ERROR:
- printf("RXRPC_LOCAL_ERROR: ");
- if (n != sizeof(abort_code))
- goto dump_data;
- memcpy(&abort_code, p, sizeof(abort_code));
- printf("%s\n", strerror(abort_code));
- continue;
+error_2:
+ dlclose(trans);
+error:
+ fprintf(stderr, "%s: %s\n", name, err);
+ errno = -ELIBBAD;
+ return -1;
+}
- default:
- break;
- }
- }
+/*
+ * Set up a new transport
+ */
+struct kafs_transport_handle *rx_new_transport(const struct sockaddr *local_sa,
+ socklen_t local_salen)
+{
- printf("l=%d t=%d", cmsg->cmsg_level, cmsg->cmsg_type);
+ if (!transport) {
+ fprintf(stderr, "No Rx transport loaded\n");
+ errno = EINVAL;
+ return NULL;
+ }
- dump_data_colon:
- printf(": ");
- dump_data:
- printf("{");
- for (; n > 0; n--, p++)
- printf("%02x", *p);
+ return transport->init_transport(local_sa, local_salen);
+}
- print_nl:
- printf("}\n");
- }
+/*
+ * Destroy a transport
+ */
+void rx_free_transport(struct kafs_transport_handle *trans)
+{
+ transport->free_transport(trans);
}
/*
* Set up a new connection
*/
-struct rx_connection *rx_new_connection(const struct sockaddr *sa,
+struct rx_connection *rx_new_connection(struct kafs_transport_handle *trans_handle,
+ const struct sockaddr *sa,
socklen_t salen,
uint16_t service,
- uint16_t local_port,
- uint16_t local_service,
int exclusive,
- const char *key,
- int security)
+ const char *cellname,
+ enum kafs_connection_auth_level level)
{
- struct sockaddr_rxrpc srx;
- struct rx_connection *z_conn;
- int ret;
-
- z_conn = calloc(1, sizeof(*z_conn));
- if (!z_conn)
- return NULL;
-
- z_conn->peer.srx_family = AF_RXRPC;
- z_conn->peer.srx_service = service;
- z_conn->peer.transport_type = SOCK_DGRAM;
- z_conn->peer.transport_len = sizeof(srx.transport.sin);
-
+ struct rx_connection *conn;
+ const struct sockaddr_in *sin;
+ const struct sockaddr_in6 *sin6;
+ struct kafs_security_handle *security;
+
+ /* Do some basic parameter checking */
switch (sa->sa_family) {
case 0:
errno = EDESTADDRREQ;
- goto error_conn;
- case AF_INET:
- if (salen != sizeof(struct sockaddr_in))
- goto inval;
- break;
- case AF_INET6:
- if (salen != sizeof(struct sockaddr_in6))
- goto inval;
- break;
- default:
- errno = EPROTOTYPE;
- goto error_conn;
- }
-
- if (security < RXRPC_SECURITY_PLAIN ||
- security > RXRPC_SECURITY_ENCRYPT)
- goto inval;
-
- memcpy(&z_conn->peer.transport, sa, salen);
- switch (sa->sa_family) {
+ return NULL;
case AF_INET:
- if (!z_conn->peer.transport.sin.sin_port) {
+ if (salen != sizeof(struct sockaddr_in)) {
+ errno = EINVAL;
+ return NULL;
+ }
+ sin = (const struct sockaddr_in *)sa;
+ if (!sin->sin_port) {
errno = EDESTADDRREQ;
- goto error_conn;
+ return NULL;
}
break;
case AF_INET6:
- if (!z_conn->peer.transport.sin6.sin6_port) {
+ if (salen != sizeof(struct sockaddr_in6)) {
+ errno = EINVAL;
+ return NULL;
+ }
+ sin6 = (const struct sockaddr_in6 *)sa;
+ if (!sin6->sin6_port) {
errno = EDESTADDRREQ;
- goto error_conn;
+ return NULL;
}
break;
+ default:
+ errno = EPROTOTYPE;
+ return NULL;
}
- /* Open up a socket for talking to the AF_RXRPC module */
- z_conn->fd = socket(AF_RXRPC, SOCK_DGRAM, PF_INET);
- if (z_conn->fd < 0)
- goto error_conn;
-
- if (exclusive) {
- ret = setsockopt(z_conn->fd,
- SOL_RXRPC, RXRPC_EXCLUSIVE_CONNECTION,
- NULL, 0);
- if (ret == -1)
- goto error_conn;
+ if (level < kafs_connection_no_auth ||
+ level > kafs_connection_encrypt) {
+ errno = EINVAL;
+ return NULL;
}
- if (key) {
- ret = setsockopt(z_conn->fd, SOL_RXRPC, RXRPC_MIN_SECURITY_LEVEL,
- &security, sizeof(security));
- if (ret == -1)
- goto error_conn;
-
- ret = setsockopt(z_conn->fd, SOL_RXRPC, RXRPC_SECURITY_KEY,
- key, strlen(key));
- if (ret == -1)
- goto error_conn;
- }
+ /* Set up our connection tracking */
+ conn = calloc(1, sizeof(*conn));
+ if (!conn)
+ return NULL;
- /* Bind an address to the local endpoint */
- memset(&srx, 0, sizeof(srx));
- srx.srx_family = AF_RXRPC;
- srx.srx_service = local_service;
- srx.transport_type = SOCK_DGRAM;
- srx.transport_len = salen;
- srx.transport.sin.sin_family = sa->sa_family;
- switch (sa->sa_family) {
- case AF_INET:
- srx.transport.sin.sin_port = htons(local_port);
- break;
- case AF_INET6:
- srx.transport.sin6.sin6_port = htons(local_port);
- break;
- }
+ /* Create a security handle */
+ security = transport->new_security(trans_handle, cellname, level);
+ if (!security)
+ goto error_conn;
- ret = bind(z_conn->fd, (struct sockaddr *)&srx, sizeof(srx));
- if (ret < 0)
- goto error_fd;
+ /* Open the transport connection */
+ conn->handle = transport->open_connection(trans_handle,
+ security,
+ sa, salen,
+ service,
+ exclusive);
+ transport->free_security(security);
+ if (!conn->handle)
+ goto error_conn;
- return z_conn;
+ return conn;
-inval:
- errno = EINVAL;
- goto error_conn;
-error_fd:
- close(z_conn->fd);
error_conn:
- free(z_conn);
+ free(conn);
return NULL;
}
* Close an RxRPC client connection. This will cause all outstanding
* operations to be aborted by the kernel..
*/
-void rx_close_connection(struct rx_connection *z_conn)
+void rx_close_connection(struct rx_connection *conn)
{
- close(z_conn->fd);
- free(z_conn);
+ transport->close_connection(conn->handle);
+ free(conn);
}
/*
- * Allocate a call structure. It is given one buffer ready to go.
+ * Create a call structure with which to make a call. It is given one buffer
+ * ready to go.
*/
-struct rx_call *rxrpc_alloc_call(struct rx_connection *z_conn,
- int incoming_call)
+struct rx_call *rxrpc_make_call(struct rx_connection *conn)
{
struct rx_call *call;
struct rx_buf *buf;
call = calloc(1, sizeof(struct rx_call));
if (!call)
- return NULL;
+ goto error_0;
buf = calloc(1, sizeof(struct rx_buf));
- if (!buf) {
- free(call);
- return NULL;
- }
+ if (!buf)
+ goto error_1;
data = malloc(RXGEN_BUFFER_SIZE);
- if (!data) {
- free(buf);
- free(call);
- return NULL;
- }
+ if (!data)
+ goto error_2;
+
+ buf->magic = RXGEN_BUF_MAGIC;
+ buf->buf = data;
+
+ call->magic = RXGEN_CALL_MAGIC;
+ call->conn = conn;
+ call->state = rx_call_cl_not_started;
+ call->more_send = true;
+ call->more_recv = true;
+ call->buffer_head = buf;
+ call->buffer_tail = buf;
+ call->data_start = data;
+ call->data_cursor = data;
+ call->data_stop = data + RXGEN_BUFFER_SIZE;
+ call->buffer_space = RXGEN_BUFFER_SIZE;
+
+ call->handle = transport->make_call(conn->handle, rxrpc_data_ready, call);
+ if (!call->handle)
+ goto error_3;
debug("Alloc: buf=%p data=%p\n", buf, data);
+ return call;
+
+error_3:
+ free(data);
+error_2:
+ free(buf);
+error_1:
+ free(call);
+error_0:
+ return NULL;
+}
+
+/*
+ * Create a call structure with which to accept a call. It is given one buffer
+ * ready to go.
+ */
+#if 0
+struct rx_call *rxrpc_accept_call(struct kafs_service_handle *service_handle,
+ void *caller_data,
+ const struct sockaddr *peer,
+ socklen_t peerlen)
+{
+ struct rx_service *service = caller_data;
+ struct rx_call *call;
+ struct rx_buf *buf;
+ uint8_t *data;
+
+ call = calloc(1, sizeof(struct rx_call));
+ if (!call)
+ goto error_0;
+
+ buf = calloc(1, sizeof(struct rx_buf));
+ if (!buf)
+ goto error_1;
+
+ data = malloc(RXGEN_BUFFER_SIZE);
+ if (!data)
+ goto error_2;
buf->magic = RXGEN_BUF_MAGIC;
buf->buf = data;
- if (incoming_call)
- call->state = rx_call_sv_not_started;
- else
- call->state = rx_call_cl_not_started;
-
call->magic = RXGEN_CALL_MAGIC;
- call->conn = z_conn;
+ call->conn = conn;
+ call->state = rx_call_sv_not_started;
call->more_send = true;
call->more_recv = true;
call->buffer_head = buf;
call->buffer_tail = buf;
call->data_start = data;
call->data_cursor = data;
- call->data_stop = incoming_call ? data : data + RXGEN_BUFFER_SIZE;
+ call->data_stop = data;
call->buffer_space = RXGEN_BUFFER_SIZE;
+
+ call->handle = transport->accept_call(service_handle, peer, peerlen);
+ if (!call->handle)
+ goto error_3;
+
+ debug("Alloc: buf=%p data=%p\n", buf, data);
return call;
+
+error_3:
+ free(data);
+error_2:
+ free(buf);
+error_1:
+ free(call);
+error_0:
+ return NULL;
}
+#endif
static void rxrpc_check_buf(const struct rx_buf *buf)
{
int rxrpc_send_data(struct rx_call *call)
{
struct rx_buf *cursor;
- struct msghdr msg;
- size_t ctrllen, i;
- unsigned char control[128];
struct iovec iov[16];
unsigned more;
int ioc, ret;
}
more_to_send:
- /* Request an operation */
- ctrllen = 0;
- RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)call);
-
- msg.msg_name = &call->conn->peer;
- msg.msg_namelen = sizeof(call->conn->peer);
- msg.msg_iov = iov;
- msg.msg_control = control;
- msg.msg_controllen = ctrllen;
- msg.msg_flags = 0;
-
/* We may have a completely sent buffer on the front of the queue if
* this was the last buffer on the last send. The buffer queue isn't
* allowed to be empty at any point.
free(sent);
}
- more = MSG_MORE;
+ more = 1;
for (ioc = 0; ioc < 16; ioc++) {
rxrpc_check_buf(cursor);
}
cursor = cursor->next;
}
- msg.msg_iovlen = ioc;
-
- /* Send the data */
- //dump_cmsg(&msg);
-
- for (i = 0; i < msg.msg_iovlen; i++)
- debug("IOV[%02zu] %04zu %p\n",
- i, msg.msg_iov[i].iov_len, msg.msg_iov[i].iov_base);
- ret = sendmsg(call->conn->fd, &msg, more);
- debug("SENDMSG: %d%s\n", ret, more ? " [more]" : "");
+ ret = transport->send_data(call->handle, iov, ioc, more);
if (ret == -1)
return -1;
}
/*
- * Receive data from a socket.
+ * Call back from the transport to indicate that data is ready.
*/
-int rxrpc_recv_data(struct rx_connection *z_conn, bool nowait)
+static void rxrpc_data_ready(struct kafs_call_handle *call_handle, void *caller_data)
{
- struct rx_call *call;
+ struct rx_call *call = caller_data;
struct rx_buf *bufs[4] = { NULL }, *cursor;
- struct sockaddr_rxrpc srx;
- struct cmsghdr *cmsg;
- struct msghdr msg;
struct iovec iov[4];
- unsigned char control[128];
- uint32_t tmpbuf[1];
- int ioc, ret;
-
- /* Peek at the next message */
- iov[0].iov_base = &tmpbuf;
- iov[0].iov_len = sizeof(tmpbuf);
-
- memset(&msg, 0, sizeof(msg));
- msg.msg_iov = iov;
- msg.msg_iovlen = 1;
- msg.msg_name = &srx;
- msg.msg_namelen = sizeof(srx);
- msg.msg_control = control;
- msg.msg_controllen = sizeof(control);
- msg.msg_flags = 0;
-
- ret = recvmsg(z_conn->fd, &msg, MSG_PEEK | (nowait ? MSG_DONTWAIT : 0));
- debug("RECVMSG PEEK: %d\n", ret);
- if (ret == -1)
- return -1;
-
- /* Find the call ID. */
- call = NULL;
- for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
- unsigned char *p;
-
- if (cmsg->cmsg_level != SOL_RXRPC ||
- cmsg->cmsg_type != RXRPC_USER_CALL_ID)
- continue;
-
- p = CMSG_DATA(cmsg);
- call = *(struct rx_call **)p;
- break;
- }
- if (!call)
- abort();
-
- /* Now actually retrieve that message */
- memset(&msg, 0, sizeof(msg));
- msg.msg_iov = iov;
- msg.msg_iovlen = 0;
- msg.msg_name = &srx;
- msg.msg_namelen = sizeof(srx);
- msg.msg_control = control;
- msg.msg_controllen = sizeof(control);
- msg.msg_flags = 0;
+ enum kafs_recv_condition cond;
+ uint32_t supplementary_info;
+ int i, ioc, ret;
debug("Recv: buf[0]=%p data[0]=%p (io=%u)\n",
call->buffer_tail, call->buffer_tail->buf, call->buffer_tail->io_cursor);
bufs[0] = call->buffer_tail;
iov[0].iov_base = bufs[0]->buf + bufs[0]->io_cursor;
iov[0].iov_len = RXGEN_BUFFER_SIZE - bufs[0]->io_cursor;
- msg.msg_iovlen = 1;
+ ioc = 1;
} else {
- msg.msg_iovlen = 0;
+ ioc = 0;
}
- for (ioc = msg.msg_iovlen; ioc < 4; ioc++) {
- bufs[ioc] = calloc(1, sizeof(struct rx_buf));
- bufs[ioc]->magic = RXGEN_BUF_MAGIC;
- if (!bufs[ioc]) {
- while (--ioc >= (int)msg.msg_iovlen) {
- bufs[ioc]->magic = RXGEN_BUF_DEAD;
- free(bufs[ioc]->buf);
- free(bufs[ioc]);
+ for (i = ioc; i < 4; i++) {
+ bufs[i] = calloc(1, sizeof(struct rx_buf));
+ bufs[i]->magic = RXGEN_BUF_MAGIC;
+ if (!bufs[i]) {
+ while (--i >= ioc) {
+ bufs[i]->magic = RXGEN_BUF_DEAD;
+ free(bufs[i]->buf);
+ free(bufs[i]);
}
- return -1;
+ return;
}
- bufs[ioc]->buf = malloc(RXGEN_BUFFER_SIZE);
- if (!bufs[ioc]->buf) {
- bufs[ioc]->magic = RXGEN_BUF_DEAD;
- free(bufs[ioc]);
- while (--ioc >= (int)msg.msg_iovlen) {
- bufs[ioc]->magic = RXGEN_BUF_DEAD;
- free(bufs[ioc]->buf);
- free(bufs[ioc]);
+ bufs[i]->buf = malloc(RXGEN_BUFFER_SIZE);
+ if (!bufs[i]->buf) {
+ bufs[i]->magic = RXGEN_BUF_DEAD;
+ free(bufs[i]);
+ while (--i >= ioc) {
+ bufs[i]->magic = RXGEN_BUF_DEAD;
+ free(bufs[i]->buf);
+ free(bufs[i]);
}
- return -1;
+ return;
}
- iov[ioc].iov_base = bufs[ioc]->buf;
- iov[ioc].iov_len = RXGEN_BUFFER_SIZE;
+ iov[i].iov_base = bufs[i]->buf;
+ iov[i].iov_len = RXGEN_BUFFER_SIZE;
}
- msg.msg_iovlen = 4;
+ ioc = 4;
- ret = recvmsg(z_conn->fd, &msg, 0);
- debug("RECVMSG: %d\n", ret);
+ ret = transport->recv_data(call->handle, iov, ioc, &cond, &supplementary_info);
+ debug("RECVDATA: %d\n", ret);
if (ret == -1)
- return -1;
-
- debug("RECV: %d [fl:%x]\n", ret, msg.msg_flags);
- debug("CMSG: %zu\n", msg.msg_controllen);
- debug("IOV: %zu [0]=%zu\n", msg.msg_iovlen, iov[0].iov_len);
+ return;
call->bytes_received += ret;
/* Attach any used buffers to the call and discard the rest */
if (ret > 0) {
- for (ioc = 0; ioc < 4 && ret > 0; ioc++) {
- unsigned added = RXGEN_BUFFER_SIZE - bufs[ioc]->io_cursor;
- debug("xfer[%d] space=%u rem=%d\n", ioc, added, ret);
+ for (i = 0; i < 4 && ret > 0; i++) {
+ unsigned added = RXGEN_BUFFER_SIZE - bufs[i]->io_cursor;
+ debug("xfer[%d] space=%u rem=%d\n", i, added, ret);
if (added > (unsigned)ret)
added = (unsigned)ret;
- bufs[ioc]->io_cursor += added;
+ bufs[i]->io_cursor += added;
call->data_count += added;
ret -= added;
- if (call->buffer_tail == bufs[ioc])
+ if (call->buffer_tail == bufs[i])
continue;
- call->buffer_tail->next = bufs[ioc];
- call->buffer_tail = bufs[ioc];
+ call->buffer_tail->next = bufs[i];
+ call->buffer_tail = bufs[i];
if (ret <= 0) {
- ioc++;
+ i++;
break;
}
}
- for (; ioc < 4; ioc++) {
- bufs[ioc]->magic = RXGEN_BUF_DEAD;
- free(bufs[ioc]->buf);
- free(bufs[ioc]);
+ for (; i < 4; i++) {
+ bufs[i]->magic = RXGEN_BUF_DEAD;
+ free(bufs[i]->buf);
+ free(bufs[i]);
}
}
debug("RecvQ buf=%p data=%p iocur=%u\n",
cursor, cursor->buf, cursor->io_cursor);
- /* Process the metadata */
- if (msg.msg_flags & MSG_EOR)
+ /* Process the call state given to us by the receiver. */
+ switch (cond) {
+ case kafs_recv_more:
+ break;
+ case kafs_recv_final_ack:
+ if (call->state == rx_call_sv_waiting_for_final_ack) {
+ call->state = rx_call_sv_complete;
+ } else {
+ fprintf(stderr, "RxRPC: Recv-Ack in bad call state (%d)\n",
+ call->state);
+ abort();
+ }
+ case kafs_recv_complete:
call->known_to_kernel = 0;
- if (!(msg.msg_flags & MSG_MORE))
+ case kafs_recv_complete_now_send:
call->more_recv = false;
+ break;
- for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
- unsigned char *p;
- int n;
-
- if (cmsg->cmsg_level != SOL_RXRPC)
- continue;
-
- n = cmsg->cmsg_len - CMSG_ALIGN(sizeof(*cmsg));
- p = CMSG_DATA(cmsg);
-
- switch (cmsg->cmsg_type) {
- case RXRPC_ABORT:
- if (n != sizeof(call->abort_code))
- call->abort_code = 0;
- else
- memcpy(&call->abort_code, p, sizeof(call->abort_code));
- z_conn->last_abort_code = call->abort_code;
- call->error_code = ECONNABORTED;
- call->state = rx_call_remotely_aborted;
- break;
-
- case RXRPC_NET_ERROR:
- if (n != sizeof(ret)) {
- errno = EBADMSG;
- return -1;
- }
- memcpy(&ret, p, sizeof(ret));
- call->error_code = ret;
- call->state = rx_call_net_error;
- break;
-
- case RXRPC_LOCAL_ERROR:
- if (n != sizeof(ret)) {
- errno = EBADMSG;
- return -1;
- }
- memcpy(&ret, p, sizeof(ret));
- call->error_code = ret;
- call->state = rx_call_local_error;
- break;
-
- case RXRPC_BUSY:
- call->error_code = ECONNREFUSED;
- call->state = rx_call_rejected_busy;
- break;
-
- case RXRPC_ACK:
- if (call->state == rx_call_sv_waiting_for_final_ack) {
- call->state = rx_call_sv_complete;
- } else {
- fprintf(stderr, "RxRPC: Recv-Ack in bad call state (%d)\n",
- call->state);
- abort();
- }
- break;
-
- case RXRPC_RESPONSE:
- call->secured = 1;
- break;
-
- default:
- break;
- }
+ case kafs_recv_abort:
+ call->conn->last_abort_code = supplementary_info;
+ call->abort_code = supplementary_info;
+ call->error_code = ECONNABORTED;
+ call->state = rx_call_remotely_aborted;
+ call->known_to_kernel = 0;
+ call->more_recv = false;
+ break;
+ case kafs_recv_net_error:
+ call->error_code = supplementary_info;
+ call->state = rx_call_net_error;
+ call->known_to_kernel = 0;
+ call->more_recv = false;
+ break;
+ case kafs_recv_local_error:
+ call->error_code = supplementary_info;
+ call->state = rx_call_local_error;
+ call->known_to_kernel = 0;
+ call->more_recv = false;
+ break;
+ case kafs_recv_busy:
+ call->error_code = ECONNREFUSED;
+ call->state = rx_call_rejected_busy;
+ call->known_to_kernel = 0;
+ call->more_recv = false;
+ break;
+ default:
+ abort();
}
/* Switch into appropriate decode state */
* and is willing to settle for nothing also.
*/
if (call->data_count <= 0 && call->more_recv)
- return 0;
+ return;
} else if (call->data_count < call->need_size) {
if (!call->more_recv) {
/* Short data */
call->error_code = ENODATA;
rxrpc_abort_call(call, 1);
}
- return 0;
+ return;
}
/* Handle reception of the opcode in a server-side call. Note
goto loop;
}
+ debug("DEC %u > %u\n", call->data_count, call->need_size);
ret = call->decoder(call);
if (ret < 0) {
rxrpc_abort_call(call, 1);
/* Invoke the processor */
call->processor(call);
- return 0;
+ return;
}
}
break;
case rx_call_sv_processing:
call->failed(call);
- return 0;
+ return;
default:
fprintf(stderr, "RxRPC: Recv in bad call state (%d)\n", call->state);
abort();
}
-
- return 0;
}
/*
*/
void rxrpc_abort_call(struct rx_call *call, uint32_t abort_code)
{
- struct msghdr msg;
- size_t ctrllen;
- unsigned char control[128];
if (call->known_to_kernel) {
- memset(control, 0, sizeof(control));
- ctrllen = 0;
- RXRPC_ADD_CALLID(control, ctrllen, (unsigned long)call);
- RXRPC_ADD_ABORT(control, ctrllen, abort_code);
-
- msg.msg_name = &call->conn->peer;
- msg.msg_namelen = sizeof(call->conn->peer);
- msg.msg_iov = NULL;
- msg.msg_iovlen = 0;
- msg.msg_control = control;
- msg.msg_controllen = ctrllen;
- msg.msg_flags = 0;
-
- sendmsg(call->conn->fd, &msg, 0);
+ transport->abort_call(call->handle, abort_code);
call->known_to_kernel = 0;
}
call->state = rx_call_locally_aborted;
rxrpc_check_call(call);
if (call->known_to_kernel)
rxrpc_abort_call(call, abort_code);
+ transport->terminate_call(call->handle);
call->magic = 0x7a7b7c7d;
for (cursor = call->buffer_head; cursor; cursor = next) {
rxrpc_check_buf(cursor);
free(call);
}
+/*
+ * Poll the transport and receive data if there's anything available.
+ */
+int rxrpc_maybe_recv_data(struct rx_connection *conn)
+{
+ return transport->poll_connection(conn->handle, true);
+}
+
/*
* Run a single call synchronously.
*/
int rxrpc_run_sync_call(struct rx_call *call)
{
- struct pollfd fds[1];
+ int ret;
while (call->known_to_kernel) {
- fds[0].fd = call->conn->fd;
- fds[0].events = POLLIN;
- fds[0].revents = 0;
-
- if (poll(fds, 1, -1) == -1) {
- fprintf(stderr, "Poll failed: %m\n");
- return -1;
- }
- if (rxrpc_recv_data(call->conn, false)) {
- fprintf(stderr, "rxrpc_recv_data failed: %m\n");
- return -1;
- }
+ debug("Run call %d\n", call->known_to_kernel);
+ ret = transport->poll_connection(call->conn->handle, false);
+ if (ret < 0)
+ return ret;
}
switch (call->state) {
*/
cursor = call->buffer_head;
segment = cursor->io_cursor;
+ debug("DEC Advance %u\n", segment);
new_stop = cursor->buf + segment;
if (call->data_stop >= new_stop) {
/* This buffer must then be completely used as we're required to check
* amount received before reading it.
*/
- if (new_stop != cursor->buf + RXGEN_BUFFER_SIZE)
- abort(); /* Didn't completely consume a buffer */
- if (call->buffer_tail == cursor)
- abort(); /* Unexpectedly out of data */
+ if (call->data_stop > new_stop) {
+ debug("buffer overrun (%p > %p)\n", call->data_stop, new_stop);
+ abort();
+ }
+ if (new_stop != cursor->buf + cursor->io_cursor) {
+ debug("buffer not consumed (%p != %p)\n",
+ new_stop, cursor->buf + cursor->io_cursor);
+ abort();
+ }
+ if (call->buffer_tail == cursor) {
+ debug("unexpectedly out of data\n");
+ abort();
+ }
/* Move to the next buffer */
rxrpc_post_dec(call);
--- /dev/null
+/* Access points into the dynamically loaded Rx transport module
+ *
+ * Copyright (C) 2015 Red Hat, Inc. All Rights Reserved.
+ * Written by David Howells (dhowells@redhat.com)
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public Licence
+ * as published by the Free Software Foundation; either version
+ * 2 of the Licence, or (at your option) any later version.
+ */
+
+#ifndef _KAFS_TRANSPORT_H
+#define _KAFS_TRANSPORT_H
+
+#include <stdint.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+
+struct kafs_transport_handle;
+struct kafs_security_handle;
+struct kafs_connection_handle;
+struct kafs_call_handle;
+
+enum kafs_connection_auth_level {
+ kafs_connection_no_auth,
+ kafs_connection_local_auth,
+ kafs_connection_clear,
+ kafs_connection_integrity_only,
+ kafs_connection_encrypt,
+};
+
+enum kafs_recv_condition {
+ kafs_recv_more,
+ kafs_recv_final_ack,
+ kafs_recv_complete,
+ kafs_recv_complete_now_send,
+ kafs_recv_abort,
+ kafs_recv_net_error,
+ kafs_recv_local_error,
+ kafs_recv_busy,
+};
+
+#define kafs_transport_id "kAFS-utils tran"
+
+struct kafs_transport_handle;
+struct kafs_security_handle;
+struct kafs_connection_handle;
+struct kafs_service_handle;
+struct kafs_call_handle;
+
+typedef void (*kafs_incoming_call_func)(struct kafs_service_handle *service,
+ void *caller_data,
+ const struct sockaddr *peer,
+ socklen_t *peerlen);
+
+typedef void (*kafs_data_ready_func)(struct kafs_call_handle *call,
+ void *caller_data);
+
+struct kafs_transport {
+ char id[16]; /* = kafs_transport_id */
+ char name[16]; /* Name of transport */
+
+ struct kafs_transport_handle *(*init_transport)(const struct sockaddr *sa,
+ socklen_t salen);
+
+ void (*free_transport)(struct kafs_transport_handle *transport);
+
+ struct kafs_security_handle *(*new_security)(struct kafs_transport_handle *transport,
+ const char *cell_name,
+ enum kafs_connection_auth_level level);
+
+ void (*free_security)(struct kafs_security_handle *security);
+
+ struct kafs_connection_handle *(*open_connection)(struct kafs_transport_handle *transport,
+ struct kafs_security_handle *security,
+ const struct sockaddr *sa,
+ socklen_t salen,
+ uint16_t service,
+ int exclusive_connection);
+
+ void (*close_connection)(struct kafs_connection_handle *connection);
+
+ struct kafs_call_handle *(*make_call)(struct kafs_connection_handle *connection,
+ kafs_data_ready_func data_ready_func,
+ void *caller_data);
+
+ struct kafs_service_handle *(*new_service)(struct kafs_transport_handle *transport,
+ kafs_incoming_call_func new_call_func,
+ void *caller_data,
+ uint16_t local_port,
+ uint16_t local_service,
+ struct kafs_security_handle **permits);
+
+ void (*shutdown_service)(struct kafs_service_handle *service);
+
+ struct kafs_call_handle *(*accept_call)(struct kafs_service_handle *service,
+ kafs_data_ready_func data_ready_func,
+ void *caller_data,
+ struct sockaddr *_sa,
+ socklen_t *_salen);
+
+ void (*abort_call)(struct kafs_call_handle *call,
+ uint32_t abort_code);
+
+ void (*terminate_call)(struct kafs_call_handle *call);
+
+ int (*send_data)(struct kafs_call_handle *call,
+ struct iovec *iov,
+ int ioc,
+ int more);
+
+ int (*recv_data)(struct kafs_call_handle *call,
+ struct iovec *iov,
+ int ioc,
+ enum kafs_recv_condition *_cond,
+ uint32_t *_supplementary_info);
+
+ int (*poll_connection)(struct kafs_connection_handle *conn,
+ int nowait);
+};
+
+extern struct kafs_transport kafs_transport;
+
+#endif /* _KAFS_TRANSPORT_H */
# Allocate call
if what == "request":
o.rxsrc("\n")
- o.rxsrc("\tcall = rxrpc_alloc_call(z_conn, 0);\n")
+ o.rxsrc("\tcall = rxrpc_make_call(z_conn);\n")
o.rxsrc("\tif (!call)\n")
o.rxsrc("\t\treturn ", bad_ret, ";\n")
o.rxsrc("\tcall->decoder = rxgen_decode_", func.name, "_response;\n")
o.pysrc(" */\n")
o.pysrc("static PyMethodDef module_methods[] = {\n")
+ o.pysrc("\t{\"load_kafs_transport\", (PyCFunction)kafs_py_load_kafs_transport, METH_VARARGS, \"\" },\n")
+ o.pysrc("\t{\"rx_new_transport\", (PyCFunction)kafs_py_rx_new_transport, METH_VARARGS, \"\" },\n")
o.pysrc("\t{\"rx_new_connection\", (PyCFunction)kafs_py_rx_new_connection, METH_VARARGS, \"\" },\n")
o.pysrc("\t{\"afs_string_to_key\", (PyCFunction)kafs_py_string_to_key, METH_VARARGS, \"\" },\n")
# Load types
if o.xdr.py_type_defs:
o.pysrc("\tif (")
+ o.pysrc("PyType_Ready(&py_rx_transportType) < 0 ||\n\t ")
o.pysrc("PyType_Ready(&py_rx_connectionType) < 0 ||\n\t ")
o.pysrc("PyType_Ready(&py_rx_split_infoType) < 0")
for pyt in o.xdr.py_type_defs:
o.pysrc("\t\treturn NULL;\n")
o.pysrc("\n")
- o.pysrc("\tcall = rxrpc_alloc_call(z_conn->x, 0);\n")
+ o.pysrc("\tcall = rxrpc_make_call(z_conn->x);\n")
o.pysrc("\tif (!call) {\n")
if func.split:
o.pysrc("\t\tPy_XDECREF(split_info);\n");
"kafs/rxgen_afs_py.c",
"kafs/py_passwd.c",
"kafs/py_rxgen.c",
+ "kafs/py_rxtrans.c",
"kafs/py_rxconn.c",
"kafs/py_rxsplit.c",
- "kafs/af_rxrpc.c"
+ "kafs/transport.c"
],
extra_compile_args = [
"-O0",