summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--kiro-client.c4
-rw-r--r--kiro-rdma.h38
-rw-r--r--kiro-server.c163
-rw-r--r--test-client.c10
4 files changed, 146 insertions, 69 deletions
diff --git a/kiro-client.c b/kiro-client.c
index b6a515e..807d7d5 100644
--- a/kiro-client.c
+++ b/kiro-client.c
@@ -69,8 +69,8 @@ static void kiro_client_init (KiroClient *self)
static void
kiro_client_finalize (GObject *object)
{
- KiroClient *self = KIRO_CLIENT(object);
- KiroClientPrivate * priv = KIRO_CLIENT_GET_PRIVATE(self);
+ //KiroClient *self = KIRO_CLIENT(object);
+ //KiroClientPrivate * priv = KIRO_CLIENT_GET_PRIVATE(self);
//PASS
}
diff --git a/kiro-rdma.h b/kiro-rdma.h
index 696c880..fa16fd1 100644
--- a/kiro-rdma.h
+++ b/kiro-rdma.h
@@ -77,6 +77,32 @@ struct kiro_rdma_mem {
};
+static int kiro_attach_qp (struct rdma_cm_id *id)
+{
+ if(!id)
+ return -1;
+
+ id->pd = ibv_alloc_pd(id->verbs);
+ id->send_cq_channel = ibv_create_comp_channel(id->verbs);
+ id->recv_cq_channel = id->send_cq_channel; //we use one shared completion channel
+ id->send_cq = ibv_create_cq(id->verbs, 1, id, id->send_cq_channel, 0);
+ id->recv_cq = id->send_cq; //we use one shared completion queue
+
+ struct ibv_qp_init_attr qp_attr;
+ memset(&qp_attr, 0, sizeof(struct ibv_qp_init_attr));
+ qp_attr.qp_context = (uintptr_t)id;
+ qp_attr.send_cq = id->send_cq;
+ qp_attr.recv_cq = id->recv_cq;
+ qp_attr.qp_type = IBV_QPT_RC;
+ qp_attr.cap.max_send_wr = 1;
+ qp_attr.cap.max_recv_wr = 1;
+ qp_attr.cap.max_send_sge = 1;
+ qp_attr.cap.max_recv_sge = 1;
+
+ return rdma_create_qp(id, id->pd, &qp_attr);
+}
+
+
static int kiro_register_rdma_memory (struct ibv_pd *pd, struct ibv_mr **mr, void *mem, size_t mem_size, int access)
{
@@ -186,21 +212,17 @@ static void kiro_destroy_connection_context (struct kiro_connection_context **ct
}
-static void kiro_destroy_connection (struct kiro_connection **conn)
+static void kiro_destroy_connection (struct rdma_cm_id **conn)
{
if(!(*conn))
return;
-
- if(!(*conn)->id)
- return;
- rdma_disconnect((*conn)->id);
- struct kiro_connection_context *ctx = (struct kiro_connection_context *)((*conn)->id->context);
+ rdma_disconnect(*conn);
+ struct kiro_connection_context *ctx = (struct kiro_connection_context *)((*conn)->context);
if(ctx)
kiro_destroy_connection_context(&ctx);
- rdma_destroy_ep((*conn)->id);
- free(*conn);
+ rdma_destroy_ep(*conn);
*conn = NULL;
}
diff --git a/kiro-server.c b/kiro-server.c
index c017b07..52304c8 100644
--- a/kiro-server.c
+++ b/kiro-server.c
@@ -50,9 +50,13 @@ struct _KiroServerPrivate {
/* 'Real' private structures */
/* (Not accessible by properties) */
- struct rdma_event_channel *ec; // Main Event Channel
- struct rdma_cm_id *base; // Base-Listening-Connection
- struct kiro_connection *client; // Connection to the client
+ struct rdma_event_channel *ec; // Main Event Channel
+ struct rdma_cm_id *base; // Base-Listening-Connection
+ struct kiro_connection *client; // Connection to the client
+ pthread_t event_listener; // Pointer to the completion-listener thread of this connection
+ pthread_mutex_t mtx; // Mutex to signal the listener-thread termination
+ void *mem; // Pointer to the server buffer
+ size_t mem_size; // Server Buffer Size in bytes
};
@@ -82,13 +86,23 @@ kiro_server_class_init (KiroServerClass *klass)
}
-static int connect_client (struct kiro_connection *client)
+static int connect_client (struct rdma_cm_id *client)
{
+ if(!client)
+ return -1;
+
+ if( -1 == kiro_attach_qp(client))
+ {
+ printf("Could not create a QP for the new connection.\n");
+ rdma_destroy_id(client);
+ return -1;
+ }
+
struct kiro_connection_context *ctx = (struct kiro_connection_context *)calloc(1,sizeof(struct kiro_connection_context));
if(!ctx)
{
printf("Failed to create connection context.\n");
- rdma_destroy_id(client->id);
+ rdma_destroy_id(client);
return -1;
}
@@ -100,25 +114,25 @@ static int connect_client (struct kiro_connection *client)
goto error;
}
- ctx->cf_mr_recv = kiro_create_rdma_memory(client->id->pd, sizeof(struct kiro_ctrl_msg), IBV_ACCESS_LOCAL_WRITE);
- ctx->cf_mr_send = kiro_create_rdma_memory(client->id->pd, sizeof(struct kiro_ctrl_msg), IBV_ACCESS_LOCAL_WRITE);
+ ctx->cf_mr_recv = kiro_create_rdma_memory(client->pd, sizeof(struct kiro_ctrl_msg), IBV_ACCESS_LOCAL_WRITE);
+ ctx->cf_mr_send = kiro_create_rdma_memory(client->pd, sizeof(struct kiro_ctrl_msg), IBV_ACCESS_LOCAL_WRITE);
if(!ctx->cf_mr_recv || !ctx->cf_mr_send)
{
printf("Failed to register control message memory.\n");
goto error;
}
ctx->cf_mr_recv->size = ctx->cf_mr_send->size = sizeof(struct kiro_ctrl_msg);
- client->id->context = ctx;
+ client->context = ctx;
- if(rdma_post_recv(client->id, client, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr))
+ if(rdma_post_recv(client, client, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr))
{
printf("Posting preemtive receive for connection failed.\n");
goto error;
}
- if(rdma_accept(client->id, NULL))
+ if(rdma_accept(client, NULL))
{
- printf("Failed to establish connection to the server.\n");
+ printf("Failed to establish connection to the client with error: %i.\n", errno);
goto error;
}
printf("Client Connected.\n");
@@ -126,16 +140,16 @@ static int connect_client (struct kiro_connection *client)
error:
- rdma_reject(client->id, NULL, 0);
+ rdma_reject(client, NULL, 0);
kiro_destroy_connection_context(&ctx);
- rdma_destroy_id(client->id);
+ rdma_destroy_id(client);
return -1;
}
-static int welcome_client (struct kiro_connection *client, void *mem, size_t mem_size)
+static int welcome_client (struct rdma_cm_id *client, void *mem, size_t mem_size)
{
- struct kiro_connection_context *ctx = (struct kiro_connection_context *)(client->id->context);
+ struct kiro_connection_context *ctx = (struct kiro_connection_context *)(client->context);
ctx->rdma_mr = (struct kiro_rdma_mem *)calloc(1, sizeof(struct kiro_rdma_mem));
if(!ctx->rdma_mr)
{
@@ -145,7 +159,7 @@ static int welcome_client (struct kiro_connection *client, void *mem, size_t mem
ctx->rdma_mr->mem = mem;
ctx->rdma_mr->size = mem_size;
- ctx->rdma_mr->mr = rdma_reg_read(client->id, ctx->rdma_mr->mem, ctx->rdma_mr->size);
+ ctx->rdma_mr->mr = rdma_reg_read(client, ctx->rdma_mr->mem, ctx->rdma_mr->size);
if(!ctx->rdma_mr->mr)
{
printf("Failed to register RDMA Memory Region.\n");
@@ -157,7 +171,7 @@ static int welcome_client (struct kiro_connection *client, void *mem, size_t mem
msg->msg_type = KIRO_ACK_RDMA;
msg->peer_mri = *(ctx->rdma_mr->mr);
- if(rdma_post_send(client->id, client, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED))
+ if(rdma_post_send(client, client, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED))
{
printf("Failure while trying to post SEND.\n");
kiro_destroy_rdma_memory(ctx->rdma_mr);
@@ -166,7 +180,7 @@ static int welcome_client (struct kiro_connection *client, void *mem, size_t mem
struct ibv_wc wc;
- if(rdma_get_send_comp(client->id, &wc) < 0)
+ if(rdma_get_send_comp(client, &wc) < 0)
{
printf("Failed to post RDMA MRI to client.\n");
kiro_destroy_rdma_memory(ctx->rdma_mr);
@@ -178,6 +192,71 @@ static int welcome_client (struct kiro_connection *client, void *mem, size_t mem
}
+void * event_loop (void *self)
+{
+ KiroServerPrivate *priv = KIRO_SERVER_GET_PRIVATE((KiroServer *)self);
+ struct rdma_cm_event *active_event;
+
+ int stop = 0;
+
+ while(0 == stop) {
+ if(0 <= rdma_get_cm_event(priv->ec, &active_event))
+ {
+
+ struct rdma_cm_event *ev = malloc(sizeof(*active_event));
+ if(!ev)
+ {
+ printf("Unable to allocate memory for Event handling!\n");
+ rdma_ack_cm_event(active_event);
+ continue;
+ }
+ memcpy(ev, active_event, sizeof(*active_event));
+ rdma_ack_cm_event(active_event);
+
+ if (ev->event == RDMA_CM_EVENT_CONNECT_REQUEST)
+ {
+
+ /*
+ priv->client = (struct kiro_connection *)calloc(1, sizeof(struct kiro_connection));
+ if(!(priv->client))
+ {
+ printf("Failed to create container for client connection.\n");
+ free(ev);
+ continue;
+ }
+ priv->client->identifier = 0; //First Client
+ priv->client->id = ev->id;
+ */
+
+ if(0 == connect_client(ev->id))
+ {
+ // Connection set-up successfully! (Server)
+ // Post a welcoming "Recieve" for handshaking
+ welcome_client(ev->id, priv->mem, priv->mem_size);
+ }
+ }
+ else if(ev->event == RDMA_CM_EVENT_DISCONNECTED)
+ {
+ printf("Got disconnect request.\n");
+ //pthread_mutex_unlock(&(priv->mtx));
+ kiro_destroy_connection(&(ev->id));
+ printf("Connection closed successfully\n");
+ }
+ free(ev);
+ }
+
+ // Mutex will be freed as a signal to stop request
+ if(0 == pthread_mutex_trylock(&(priv->mtx)))
+ stop = 1;
+ }
+
+ printf("Closing Event Listener Thread\n");
+ return NULL;
+}
+
+
+
+
int kiro_server_start (KiroServer *self, char *address, char *port, void* mem, size_t mem_size)
{
KiroServerPrivate *priv = KIRO_SERVER_GET_PRIVATE(self);
@@ -245,43 +324,10 @@ int kiro_server_start (KiroServer *self, char *address, char *port, void* mem, s
rdma_destroy_ep(priv->base);
return -1;
}
- printf("Enpoint listening.\n");
-
-
- priv->client = (struct kiro_connection *)calloc(1, sizeof(struct kiro_connection));
- if(!(priv->client))
- {
- printf("Failed to create container for client connection.\n");
- return -1;
- }
- priv->client->identifier = 0; //First Client
-
- printf("Waiting for connection request.\n");
- if(rdma_get_request(priv->base, &(priv->client->id)))
- {
- printf("Failure waiting for clienet connection.\n");
- rdma_destroy_ep(priv->base);
- return -1;
- }
- printf("Connection Request received.\n");
-
-
- if(connect_client(priv->client))
- {
- printf("Client connection failed!\n");
- rdma_destroy_ep(priv->base);
- free(priv->client);
- return -1;
- }
-
- if(welcome_client(priv->client, mem, mem_size))
- {
- printf("Failed to setup client communication.\n");
- kiro_destroy_connection(&(priv->client));
- rdma_destroy_id(priv->base);
- return -1;
- }
+ priv->mem = mem;
+ priv->mem_size = mem_size;
+
priv->ec = rdma_create_event_channel();
int oldflags = fcntl (priv->ec->fd, F_GETFL, 0);
/* Only change the FD Mode if we were able to get its flags */
@@ -293,10 +339,15 @@ int kiro_server_start (KiroServer *self, char *address, char *port, void* mem, s
if(rdma_migrate_id(priv->base, priv->ec))
{
printf("Was unable to migrate connection to new Event Channel.\n");
- kiro_destroy_connection(&(priv->client));
- rdma_destroy_id(priv->base);
+ rdma_destroy_ep(priv->base);
return -1;
}
+
+ pthread_mutex_init(&(priv->mtx), NULL);
+ pthread_mutex_lock(&(priv->mtx));
+ pthread_create(&(priv->event_listener), NULL, event_loop, self);
+
+ printf("Enpoint listening.\n");
sleep(1);
return 0;
diff --git a/test-client.c b/test-client.c
index 65a3c08..469aa5e 100644
--- a/test-client.c
+++ b/test-client.c
@@ -38,9 +38,13 @@ int main ( int argc, char *argv[] )
return -1;
}
KiroClient *client = g_object_new(KIRO_TYPE_CLIENT, NULL);
- if(-1 != kiro_client_connect(client, argv[1], argv[2]))
- kiro_client_sync(client);
+ if(-1 == kiro_client_connect(client, argv[1], argv[2]))
+ {
+ g_object_unref(client);
+ return -1;
+ }
+ kiro_client_sync(client);
KiroTrb *trb = g_object_new(KIRO_TYPE_TRB, NULL);
kiro_trb_adopt(trb, kiro_client_get_memory(client));
@@ -67,7 +71,7 @@ int main ( int argc, char *argv[] )
int cont = 1;
- struct KiroTrbInfo *header = (struct KiroTrbInfo *)kiro_trb_get_raw_buffer(trb);
+ //struct KiroTrbInfo *header = (struct KiroTrbInfo *)kiro_trb_get_raw_buffer(trb);
while(cont)
{