diff --git a/random-sensor-ldmc/random-sensor.cpp b/random-sensor-ldmc/random-sensor.cpp index 91a73ea..8b6739a 100644 --- a/random-sensor-ldmc/random-sensor.cpp +++ b/random-sensor-ldmc/random-sensor.cpp @@ -5,7 +5,9 @@ #include <semaphore> #include <google/protobuf/arena.h> #include <thread> +#include <map> +#include "rpc_service.hpp" struct StatusRequest { }; @@ -104,180 +106,57 @@ public: void get_status( - StatusRequest const * request, - SensorStatus * status + StatusRequest const & request, + SensorStatus & status ) { - status->online_since = std::chrono::duration_cast<std::chrono::milliseconds>(online_since.time_since_epoch()).count(); - status->battery_capacity = 8200; - status->battery_charge = bat_charge; - status->max_sampling_rate = 10; - status->cur_sampling_rate = std::chrono::duration_cast<std::chrono::milliseconds>(period_duration).count(); - status->cur_chunk_size = chunk_size; - status->n_chunk_capacity = n_chunks_capacity(); - status->n_full_data_chunks = n_full_chunks(); - status->n_empty_data_chunks = n_empty_chunks(); + status.online_since = std::chrono::duration_cast<std::chrono::milliseconds>(online_since.time_since_epoch()).count(); + status.battery_capacity = 8200; + status.battery_charge = bat_charge; + status.max_sampling_rate = 10; + status.cur_sampling_rate = std::chrono::duration_cast<std::chrono::milliseconds>(period_duration).count(); + status.cur_chunk_size = chunk_size; + status.n_chunk_capacity = n_chunks_capacity(); + status.n_full_data_chunks = n_full_chunks(); + status.n_empty_data_chunks = n_empty_chunks(); } void set_sampling_period( - SetSamplingPeriodRequest const * request, - SetSamplingPeriodResult * result + SetSamplingPeriodRequest const & request, + SetSamplingPeriodResult & result ) { - *result = SAMPLING_PERIOD_OK; + result = SAMPLING_PERIOD_OK; } void pop_data_chunk( - PopDataChunkRequest const * request, - DataChunk * data_chunk + PopDataChunkRequest const & request, + DataChunk & data_chunk ) { } }; -#include <iostream> -#include <string> -#include <unistd.h> -#include <arpa/inet.h> -#include <sys/socket.h> -#include <cstring> - - -enum RpcTag { +enum SensorRpcTag { GET_STATUS = 0, SET_SAMPLING_PERIOD, POP_DATA_CHUNK }; -struct RpcService { -private: - uint32_t port; - RandomSensor< 1048576 > & sensor; - -public: - RpcService( - uint32_t port, - RandomSensor< 1048576 > & sensor - ) :port(port), sensor(sensor) - { - } - - void handle_client(int client_sock) { - char request_buffer[1024]; - ssize_t bytes_received; - - // Receive data from client - bytes_received = recv(client_sock, request_buffer, sizeof(request_buffer), 0); - if (bytes_received == -1) { - std::cerr << "Failed to receive data" << std::endl; - return; - } - - printf("received %lu bytes:\n", bytes_received); - for( size_t i = 0; i < bytes_received;) - { - printf("%4lx : ", i); - for( size_t col = 0; col < 8; ++col ) { - printf("%2x ", request_buffer[i++]); - } - printf("\n"); - } - - if (bytes_received >= 8 ) { - RpcTag * tag = (RpcTag*) request_buffer; - switch( *tag ) { - case GET_STATUS: { - printf("RPC call get_status()\n"); - StatusRequest * request = (StatusRequest * ) (request_buffer + sizeof(RpcTag)); - - SensorStatus status; - sensor.get_status(request, &status); - - send(client_sock, (char*)&status, sizeof(status), 0); - break; - } - - case SET_SAMPLING_PERIOD: { - SetSamplingPeriodRequest * request = (SetSamplingPeriodRequest * ) (request_buffer + sizeof(RpcTag)); - printf("RPC call set_sampling_period()\n"); - - SetSamplingPeriodResult result; - sensor.set_sampling_period(request, &result); - - send(client_sock, (char*)&result, sizeof(result), 0); - break; - } - - case POP_DATA_CHUNK: { - PopDataChunkRequest * request = (PopDataChunkRequest * ) (request_buffer + sizeof(RpcTag)); - printf("RPC call pop_data_chunk()\n"); - - DataChunk data_chunk; - sensor.pop_data_chunk(request, &data_chunk); - - send(client_sock, (char*)&data_chunk, sizeof(data_chunk), 0); - break; - } - - default: { - std::string response("invalid RPC"); - send(client_sock, response.c_str(), response.length(), 0); - } - } - } - } - - int run() { - int server_sock, client_sock; - struct sockaddr_in server_addr, client_addr; - socklen_t addr_len = sizeof(client_addr); - - // Create socket - server_sock = socket(AF_INET, SOCK_STREAM, 0); - if (server_sock == -1) { - std::cerr << "Failed to create socket" << std::endl; - return 1; - } - - // Set up server address - server_addr.sin_family = AF_INET; - server_addr.sin_addr.s_addr = INADDR_ANY; - server_addr.sin_port = htons(port); - - // Bind socket to the address and port - if (bind(server_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) { - std::cerr << "Failed to bind socket" << std::endl; - return 1; - } - - // Listen for incoming connections - if (listen(server_sock, 3) == -1) { - std::cerr << "Failed to listen on socket" << std::endl; - return 1; - } - std::cout << "Server listening on port " << port << "..." << std::endl; - - // Accept and handle client connections - while (true) { - client_sock = accept(server_sock, (struct sockaddr*)&client_addr, &addr_len); - if (client_sock == -1) { - std::cerr << "Failed to accept connection" << std::endl; - continue; - } - - std::cout << "Client connected!" << std::endl; - handle_client(client_sock); - - // Close the client socket after handling the request - close(client_sock); - } - - // Close the server socket - close(server_sock); - } -}; - int main( int argc, char* argv[] ) { RandomSensor< 1048576 > sensor; - RpcService service(8070, sensor); + RpcService<SensorRpcTag> service(8070); + service.register_handler( GET_STATUS, + std::function([&sensor]( StatusRequest const & request, SensorStatus & status ) { + sensor.get_status(request, status); + })); + service.register_handler( SET_SAMPLING_PERIOD, + std::function([&sensor]( SetSamplingPeriodRequest const & request, SetSamplingPeriodResult & result ) { + sensor.set_sampling_period(request, result); + })); + service.register_handler( POP_DATA_CHUNK, + std::function([&sensor]( PopDataChunkRequest const & request, DataChunk & result ) { + sensor.pop_data_chunk(request, result); + })); + return service.run(); } diff --git a/random-sensor-ldmc/rpc_service.hpp b/random-sensor-ldmc/rpc_service.hpp new file mode 100644 index 0000000..db03e82 --- /dev/null +++ b/random-sensor-ldmc/rpc_service.hpp @@ -0,0 +1,115 @@ +#pragma once + +#include <iostream> +#include <string> +#include <unistd.h> +#include <arpa/inet.h> +#include <sys/socket.h> +#include <cstring> +#include <map> +#include <functional> + +template < typename T_RpcTag > +struct RpcService { +private: + uint32_t port; + std::map< T_RpcTag, std::function<void( char const * input, size_t input_len, int client_sock )> > handler; + +public: + RpcService(uint32_t port) :port(port) + {} + + template < typename F, typename Request, typename Response > + void register_handler(T_RpcTag tag, std::function<F(Request const&, Response&)> f) { + handler.emplace( + tag, + std::function([f]( char const * request_buffer, size_t len, int client_sock ) { + Request * request = (Request*) (request_buffer + sizeof(T_RpcTag)); + Response response; + f( *request, response ); + send(client_sock, (void const*)&response, sizeof(response), 0); + }) + ); + } + + void handle_client(int client_sock) { + char request_buffer[1024]; + ssize_t bytes_received; + + // Receive data from client + bytes_received = recv(client_sock, request_buffer, sizeof(request_buffer), 0); + if (bytes_received == -1) { + std::cerr << "Failed to receive data" << std::endl; + return; + } + + printf("received %lu bytes:\n", bytes_received); + for( size_t i = 0; i < bytes_received;) + { + printf("%4lx : ", i); + for( size_t col = 0; col < 8; ++col ) { + printf("%2x ", request_buffer[i++]); + } + printf("\n"); + } + + if (bytes_received >= sizeof(T_RpcTag) ) { + T_RpcTag * tag = (T_RpcTag*) request_buffer; + if( handler.contains(*tag) ){ + (handler[*tag])( request_buffer, bytes_received, client_sock ); + } else { + std::string response("invalid RPC"); + send(client_sock, response.c_str(), response.length(), 0); + } + } + } + + int run() { + int server_sock, client_sock; + struct sockaddr_in server_addr, client_addr; + socklen_t addr_len = sizeof(client_addr); + + // Create socket + server_sock = socket(AF_INET, SOCK_STREAM, 0); + if (server_sock == -1) { + std::cerr << "Failed to create socket" << std::endl; + return 1; + } + + // Set up server address + server_addr.sin_family = AF_INET; + server_addr.sin_addr.s_addr = INADDR_ANY; + server_addr.sin_port = htons(port); + + // Bind socket to the address and port + if (bind(server_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) { + std::cerr << "Failed to bind socket" << std::endl; + return 1; + } + + // Listen for incoming connections + if (listen(server_sock, 3) == -1) { + std::cerr << "Failed to listen on socket" << std::endl; + return 1; + } + std::cout << "Server listening on port " << port << "..." << std::endl; + + // Accept and handle client connections + while (true) { + client_sock = accept(server_sock, (struct sockaddr*)&client_addr, &addr_len); + if (client_sock == -1) { + std::cerr << "Failed to accept connection" << std::endl; + continue; + } + + std::cout << "Client connected!" << std::endl; + handle_client(client_sock); + + // Close the client socket after handling the request + close(client_sock); + } + + // Close the server socket + close(server_sock); + } +};