/* * Copyright 2016 * * ReceiverThreads.cpp * * Created on: 21.07.2016 * Author: Tobias Frust */ #include #include "ReceiverThreads.h" #include "../UDPServer/UDPServer.h" #include #include //#define USE_VMA ReceiverThreads::ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules, const int firstPort) : timeIntervall_{timeIntervall}, firstPort_{firstPort}, numberOfDetectorModules_{numberOfDetectorModules}, address_{address}, loss_{0}, nbufs_(100) { int max_packet_size = 65535; #ifdef USE_VMA vma_ = vma_get_api(); // First call fails? if (!vma_) vma_ = vma_get_api(); #else vma_ = NULL; #endif ringbuf_.resize(nbufs_); for (int i = 0; i < nbufs_; i++) ringbuf_[i].resize(numberOfDetectorModules * max_packet_size); for(auto i = 0; i < numberOfDetectorModules; i++){ receiverModules_.emplace_back(&ReceiverThreads::receiverThread, this, firstPort+i); } for(auto i = 0; i < numberOfDetectorModules; i++){ receiverModules_[i].join(); } } auto ReceiverThreads::receiverThread(const int port) -> void { int vma_ring_fd; int max_packets = 100; int max_packet_size = 65535; int id = port - firstPort_; UDPServer server = UDPServer(address_, port); std::vector> buffers; std::size_t rcv_index = 0; std::size_t rcv_packets = 0; std::size_t rcv_size = 0; std::size_t lastIndex{0}; std::size_t loss = 0; struct mmsghdr msg[max_packets]; struct iovec msgvec[max_packets]; buffers.resize(max_packets); memset(msg, 0, sizeof(msg)); memset(msgvec, 0, sizeof(msgvec)); for (int i = 0; i < max_packets; i++) { buffers[i].resize(max_packet_size); msgvec[i].iov_base = buffers[i].data(); msgvec[i].iov_len = buffers[i].size(); msg[i].msg_hdr.msg_iov = &msgvec[i]; msg[i].msg_hdr.msg_iovlen = 1; } if (vma_) { vma_->get_socket_rings_fds(server.f_socket, &vma_ring_fd, 1); if (vma_ring_fd < 0) throw "Can't get ring fds"; } printf("ID %d listening %d\n", id, port); double coef = 1000. * 1000. * 1000. / 1024. / 1024. / 1024.; auto ts_last = std::chrono::high_resolution_clock::now(); int rbuf = 0; while(true){ int packets; struct vma_completion_t vma_comps[max_packets]; struct vma_packet_desc_t vma_packs[max_packets]; if (vma_) { // Seems crashes on ConnectX-3, requires later cards according to documentation (section 8.2) packets = vma_->socketxtreme_poll(vma_ring_fd, vma_comps, max_packets, 0); } else { packets = server.mrecv(max_packets, msg, 1); //timeIntervall_); } if (packets >= 0) { for (int i = 0; i < packets; i++) { int bytes; unsigned short *buf; char *ringptr = ringbuf_[rbuf++].data() + id * max_packet_size; if (rbuf == nbufs_) rbuf = 0; if (vma_) { vma_packs[i] = vma_comps[i].packet; switch (vma_comps[i].events) { case VMA_SOCKETXTREME_PACKET: break; case EPOLLERR: case EPOLLRDHUP: printf("Polling error event=0x%lx user_data=%ld\n", vma_comps[i].events, vma_comps[i].user_data); throw "Polling error"; default: printf("Unsupported event=0x%lx user_data=%ld\n", vma_comps[i].events, vma_comps[i].user_data); throw "Polling error"; } bytes = vma_comps[i].packet.total_len; buf = reinterpret_cast(vma_comps[i].packet.buff_lst->payload); int n_bufs = vma_comps[i].packet.num_bufs; struct vma_buff_t *vma_buf = vma_comps[i].packet.buff_lst; for (int j = 0; j < n_bufs; j++) { memcpy(ringptr, vma_buf->payload, vma_buf->len); ringptr += vma_buf->len; vma_buf = vma_buf->next; } } else { bytes = msg[i].msg_len; buf = reinterpret_cast(msgvec[i].iov_base); memcpy(ringptr, buf, bytes); } rcv_packets++; rcv_size += bytes; std::size_t index =*((std::size_t *)buf); int diff = index - lastIndex - 1; if(diff > 0){ loss += diff; } /* if (port == 4000) { printf("%i:%i:%i:%i,", index, diff, loss, i); }*/ lastIndex = index; } if (vma_) { vma_->socketxtreme_free_vma_packets(vma_packs, packets); } } auto ts = std::chrono::high_resolution_clock::now(); std::chrono::nanoseconds d = ts - ts_last; if (d.count() >= 1000000000) { printf("Lost %.2lf%, Received: %i (%zu bytes, %.3lf GBit/s) in %.3lf ms [VMA: %i, port: %i]\n", loss / (double)(lastIndex - rcv_index)*100.0, rcv_packets, rcv_size, 8. * rcv_size * coef / d.count() , 1. * d.count() / 1000000, (vma_?1:0), port); rcv_packets = 0; rcv_size = 0; rcv_index = lastIndex; loss = 0; ts_last = ts; } } std::cout << "Lost " << loss << " from " << lastIndex << " packets; (" << loss/(double)lastIndex*100.0 << "%)"; loss_ += loss; }