diff options
| author | Timo Dritschler <timo.dritschler@kit.edu> | 2014-05-13 16:25:42 +0200 | 
|---|---|---|
| committer | Timo Dritschler <timo.dritschler@kit.edu> | 2014-05-13 16:25:42 +0200 | 
| commit | 72cb71c99131db200871dac9e17acefdf97292e7 (patch) | |
| tree | a862c2c2e91ccecee54a06d6ad88cac707e2c449 | |
| parent | 86c5a2afb4cbebbc7b41b9815c993c4fcc099b64 (diff) | |
| download | kiro-72cb71c99131db200871dac9e17acefdf97292e7.tar.gz kiro-72cb71c99131db200871dac9e17acefdf97292e7.tar.bz2 kiro-72cb71c99131db200871dac9e17acefdf97292e7.tar.xz kiro-72cb71c99131db200871dac9e17acefdf97292e7.zip | |
Added kiro_attach_qp to kiro-rdma.h that creates a new QP for
a rdma_cm_id and attaches it.
Changed kiro_destroy_connection to work on a rdma_cm_id instead.
Changed kiro-server accordingly.
Restructured kiro-server to use an event loop thread to listen for
new client connections.
Restructured kiro-server to no longer memorize the client connections.
It is currently unneccessary as no control-flow is exchanged.
| -rw-r--r-- | kiro-client.c | 4 | ||||
| -rw-r--r-- | kiro-rdma.h | 38 | ||||
| -rw-r--r-- | kiro-server.c | 163 | ||||
| -rw-r--r-- | test-client.c | 10 | 
4 files changed, 146 insertions, 69 deletions
| diff --git a/kiro-client.c b/kiro-client.c index b6a515e..807d7d5 100644 --- a/kiro-client.c +++ b/kiro-client.c @@ -69,8 +69,8 @@ static void kiro_client_init (KiroClient *self)  static void  kiro_client_finalize (GObject *object)  { -    KiroClient *self = KIRO_CLIENT(object); -    KiroClientPrivate * priv = KIRO_CLIENT_GET_PRIVATE(self); +    //KiroClient *self = KIRO_CLIENT(object); +    //KiroClientPrivate * priv = KIRO_CLIENT_GET_PRIVATE(self);      //PASS  } diff --git a/kiro-rdma.h b/kiro-rdma.h index 696c880..fa16fd1 100644 --- a/kiro-rdma.h +++ b/kiro-rdma.h @@ -77,6 +77,32 @@ struct kiro_rdma_mem {  }; +static int kiro_attach_qp (struct rdma_cm_id *id) +{ +    if(!id) +        return -1; +     +    id->pd = ibv_alloc_pd(id->verbs); +    id->send_cq_channel = ibv_create_comp_channel(id->verbs); +    id->recv_cq_channel = id->send_cq_channel; //we use one shared completion channel +    id->send_cq = ibv_create_cq(id->verbs, 1, id, id->send_cq_channel, 0); +    id->recv_cq = id->send_cq; //we use one shared completion queue +     +    struct ibv_qp_init_attr qp_attr; +    memset(&qp_attr, 0, sizeof(struct ibv_qp_init_attr)); +    qp_attr.qp_context = (uintptr_t)id; +    qp_attr.send_cq = id->send_cq; +    qp_attr.recv_cq = id->recv_cq; +    qp_attr.qp_type = IBV_QPT_RC; +    qp_attr.cap.max_send_wr = 1; +    qp_attr.cap.max_recv_wr = 1; +    qp_attr.cap.max_send_sge = 1; +    qp_attr.cap.max_recv_sge = 1; +     +    return rdma_create_qp(id, id->pd, &qp_attr); +} + +  static int kiro_register_rdma_memory (struct ibv_pd *pd, struct ibv_mr **mr, void *mem, size_t mem_size, int access)  { @@ -186,21 +212,17 @@ static void kiro_destroy_connection_context (struct kiro_connection_context **ct  } -static void kiro_destroy_connection (struct kiro_connection **conn) +static void kiro_destroy_connection (struct rdma_cm_id **conn)  {      if(!(*conn))          return; -     -    if(!(*conn)->id) -        return; -    rdma_disconnect((*conn)->id); -    struct kiro_connection_context *ctx = (struct kiro_connection_context *)((*conn)->id->context); +    rdma_disconnect(*conn); +    struct kiro_connection_context *ctx = (struct kiro_connection_context *)((*conn)->context);      if(ctx)          kiro_destroy_connection_context(&ctx); -    rdma_destroy_ep((*conn)->id); -    free(*conn); +    rdma_destroy_ep(*conn);      *conn = NULL;  } diff --git a/kiro-server.c b/kiro-server.c index c017b07..52304c8 100644 --- a/kiro-server.c +++ b/kiro-server.c @@ -50,9 +50,13 @@ struct _KiroServerPrivate {      /* 'Real' private structures */      /* (Not accessible by properties) */ -    struct rdma_event_channel   *ec;        // Main Event Channel -    struct rdma_cm_id           *base;      // Base-Listening-Connection -    struct kiro_connection      *client;    // Connection to the client +    struct rdma_event_channel   *ec;            // Main Event Channel +    struct rdma_cm_id           *base;          // Base-Listening-Connection +    struct kiro_connection      *client;        // Connection to the client +    pthread_t                   event_listener; // Pointer to the completion-listener thread of this connection +    pthread_mutex_t             mtx;            // Mutex to signal the listener-thread termination +    void                        *mem;           // Pointer to the server buffer +    size_t                      mem_size;       // Server Buffer Size in bytes  }; @@ -82,13 +86,23 @@ kiro_server_class_init (KiroServerClass *klass)  } -static int connect_client (struct kiro_connection *client) +static int connect_client (struct rdma_cm_id *client)  { +    if(!client) +        return -1; +         +    if( -1 == kiro_attach_qp(client)) +    { +        printf("Could not create a QP for the new connection.\n"); +        rdma_destroy_id(client); +        return -1; +    } +          struct kiro_connection_context *ctx = (struct kiro_connection_context *)calloc(1,sizeof(struct kiro_connection_context));      if(!ctx)      {          printf("Failed to create connection context.\n"); -        rdma_destroy_id(client->id); +        rdma_destroy_id(client);          return -1;      } @@ -100,25 +114,25 @@ static int connect_client (struct kiro_connection *client)          goto error;      } -    ctx->cf_mr_recv = kiro_create_rdma_memory(client->id->pd, sizeof(struct kiro_ctrl_msg), IBV_ACCESS_LOCAL_WRITE); -    ctx->cf_mr_send = kiro_create_rdma_memory(client->id->pd, sizeof(struct kiro_ctrl_msg), IBV_ACCESS_LOCAL_WRITE); +    ctx->cf_mr_recv = kiro_create_rdma_memory(client->pd, sizeof(struct kiro_ctrl_msg), IBV_ACCESS_LOCAL_WRITE); +    ctx->cf_mr_send = kiro_create_rdma_memory(client->pd, sizeof(struct kiro_ctrl_msg), IBV_ACCESS_LOCAL_WRITE);      if(!ctx->cf_mr_recv || !ctx->cf_mr_send)      {          printf("Failed to register control message memory.\n");          goto error;      }      ctx->cf_mr_recv->size = ctx->cf_mr_send->size = sizeof(struct kiro_ctrl_msg); -    client->id->context = ctx; +    client->context = ctx; -    if(rdma_post_recv(client->id, client, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr)) +    if(rdma_post_recv(client, client, ctx->cf_mr_recv->mem, ctx->cf_mr_recv->size, ctx->cf_mr_recv->mr))      {          printf("Posting preemtive receive for connection failed.\n");          goto error;      } -    if(rdma_accept(client->id, NULL)) +    if(rdma_accept(client, NULL))      { -        printf("Failed to establish connection to the server.\n"); +        printf("Failed to establish connection to the client with error: %i.\n", errno);          goto error;      }      printf("Client Connected.\n"); @@ -126,16 +140,16 @@ static int connect_client (struct kiro_connection *client)  error: -    rdma_reject(client->id, NULL, 0); +    rdma_reject(client, NULL, 0);      kiro_destroy_connection_context(&ctx); -    rdma_destroy_id(client->id); +    rdma_destroy_id(client);      return -1;  } -static int welcome_client (struct kiro_connection *client, void *mem, size_t mem_size) +static int welcome_client (struct rdma_cm_id *client, void *mem, size_t mem_size)  { -    struct kiro_connection_context *ctx = (struct kiro_connection_context *)(client->id->context); +    struct kiro_connection_context *ctx = (struct kiro_connection_context *)(client->context);      ctx->rdma_mr = (struct kiro_rdma_mem *)calloc(1, sizeof(struct kiro_rdma_mem));      if(!ctx->rdma_mr)      { @@ -145,7 +159,7 @@ static int welcome_client (struct kiro_connection *client, void *mem, size_t mem      ctx->rdma_mr->mem = mem;      ctx->rdma_mr->size = mem_size; -    ctx->rdma_mr->mr = rdma_reg_read(client->id, ctx->rdma_mr->mem, ctx->rdma_mr->size); +    ctx->rdma_mr->mr = rdma_reg_read(client, ctx->rdma_mr->mem, ctx->rdma_mr->size);      if(!ctx->rdma_mr->mr)      {          printf("Failed to register RDMA Memory Region.\n"); @@ -157,7 +171,7 @@ static int welcome_client (struct kiro_connection *client, void *mem, size_t mem      msg->msg_type = KIRO_ACK_RDMA;      msg->peer_mri = *(ctx->rdma_mr->mr); -    if(rdma_post_send(client->id, client, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED)) +    if(rdma_post_send(client, client, ctx->cf_mr_send->mem, ctx->cf_mr_send->size, ctx->cf_mr_send->mr, IBV_SEND_SIGNALED))      {          printf("Failure while trying to post SEND.\n");          kiro_destroy_rdma_memory(ctx->rdma_mr); @@ -166,7 +180,7 @@ static int welcome_client (struct kiro_connection *client, void *mem, size_t mem      struct ibv_wc wc; -    if(rdma_get_send_comp(client->id, &wc) < 0) +    if(rdma_get_send_comp(client, &wc) < 0)      {          printf("Failed to post RDMA MRI to client.\n");          kiro_destroy_rdma_memory(ctx->rdma_mr); @@ -178,6 +192,71 @@ static int welcome_client (struct kiro_connection *client, void *mem, size_t mem  } +void * event_loop (void *self) +{ +    KiroServerPrivate *priv = KIRO_SERVER_GET_PRIVATE((KiroServer *)self); +    struct rdma_cm_event *active_event; + +    int stop = 0; + +    while(0 == stop) { +        if(0 <= rdma_get_cm_event(priv->ec, &active_event)) +        { +             +            struct rdma_cm_event *ev = malloc(sizeof(*active_event)); +            if(!ev) +            { +                printf("Unable to allocate memory for Event handling!\n"); +                rdma_ack_cm_event(active_event);  +                continue; +            } +            memcpy(ev, active_event, sizeof(*active_event)); +            rdma_ack_cm_event(active_event);             +             +            if (ev->event == RDMA_CM_EVENT_CONNECT_REQUEST) +            { + +                /* +                priv->client = (struct kiro_connection *)calloc(1, sizeof(struct kiro_connection)); +                if(!(priv->client)) +                { +                    printf("Failed to create container for client connection.\n"); +                    free(ev); +                    continue; +                } +                priv->client->identifier = 0; //First Client +                priv->client->id = ev->id; +                */ +                 +                if(0 == connect_client(ev->id)) +                { +                    // Connection set-up successfully! (Server) +                    // Post a welcoming "Recieve" for handshaking +                    welcome_client(ev->id, priv->mem, priv->mem_size); +                } +            } +            else if(ev->event == RDMA_CM_EVENT_DISCONNECTED) +            { +                printf("Got disconnect request.\n"); +                //pthread_mutex_unlock(&(priv->mtx)); +                kiro_destroy_connection(&(ev->id)); +                printf("Connection closed successfully\n"); +            }             +            free(ev); +        } + +        // Mutex will be freed as a signal to stop request +        if(0 == pthread_mutex_trylock(&(priv->mtx))) +            stop = 1; +    } + +    printf("Closing Event Listener Thread\n"); +    return NULL; +} + + + +  int kiro_server_start (KiroServer *self, char *address, char *port, void* mem, size_t mem_size)  {      KiroServerPrivate *priv = KIRO_SERVER_GET_PRIVATE(self); @@ -245,43 +324,10 @@ int kiro_server_start (KiroServer *self, char *address, char *port, void* mem, s          rdma_destroy_ep(priv->base);          return -1;      } -    printf("Enpoint listening.\n"); -     -     -    priv->client = (struct kiro_connection *)calloc(1, sizeof(struct kiro_connection)); -    if(!(priv->client)) -    { -        printf("Failed to create container for client connection.\n"); -        return -1; -    } -    priv->client->identifier = 0; //First Client -     -    printf("Waiting for connection request.\n"); -    if(rdma_get_request(priv->base, &(priv->client->id))) -    { -        printf("Failure waiting for clienet connection.\n"); -        rdma_destroy_ep(priv->base); -        return -1; -    } -    printf("Connection Request received.\n"); -     -     -    if(connect_client(priv->client)) -    { -        printf("Client connection failed!\n"); -        rdma_destroy_ep(priv->base); -        free(priv->client); -        return -1; -    } -     -    if(welcome_client(priv->client, mem, mem_size)) -    { -        printf("Failed to setup client communication.\n"); -        kiro_destroy_connection(&(priv->client)); -        rdma_destroy_id(priv->base); -        return -1; -    } +    priv->mem = mem; +    priv->mem_size = mem_size; +      priv->ec = rdma_create_event_channel();      int oldflags = fcntl (priv->ec->fd, F_GETFL, 0);      /* Only change the FD Mode if we were able to get its flags */ @@ -293,10 +339,15 @@ int kiro_server_start (KiroServer *self, char *address, char *port, void* mem, s      if(rdma_migrate_id(priv->base, priv->ec))      {          printf("Was unable to migrate connection to new Event Channel.\n"); -        kiro_destroy_connection(&(priv->client)); -        rdma_destroy_id(priv->base); +        rdma_destroy_ep(priv->base);          return -1;      } + +    pthread_mutex_init(&(priv->mtx), NULL); +    pthread_mutex_lock(&(priv->mtx)); +    pthread_create(&(priv->event_listener), NULL, event_loop, self); + +    printf("Enpoint listening.\n");      sleep(1);      return 0; diff --git a/test-client.c b/test-client.c index 65a3c08..469aa5e 100644 --- a/test-client.c +++ b/test-client.c @@ -38,9 +38,13 @@ int main ( int argc, char *argv[] )          return -1;      }      KiroClient *client = g_object_new(KIRO_TYPE_CLIENT, NULL); -    if(-1 != kiro_client_connect(client, argv[1], argv[2])) -        kiro_client_sync(client); +    if(-1 == kiro_client_connect(client, argv[1], argv[2])) +    { +        g_object_unref(client); +        return -1; +    } +    kiro_client_sync(client);      KiroTrb *trb = g_object_new(KIRO_TYPE_TRB, NULL);      kiro_trb_adopt(trb, kiro_client_get_memory(client)); @@ -67,7 +71,7 @@ int main ( int argc, char *argv[] )      int cont = 1; -    struct KiroTrbInfo *header = (struct KiroTrbInfo *)kiro_trb_get_raw_buffer(trb); +    //struct KiroTrbInfo *header = (struct KiroTrbInfo *)kiro_trb_get_raw_buffer(trb);      while(cont)      { | 
