#include <iostream> #include <chrono> #include <cmath> #include <random> #include <semaphore> #include <google/protobuf/arena.h> #include <thread> struct StatusRequest { }; struct SensorStatus { uint64_t online_since; uint32_t battery_charge; uint32_t battery_capacity; uint32_t max_sampling_rate; uint32_t cur_sampling_rate; uint32_t max_chunk_size; uint32_t cur_chunk_size; uint32_t n_chunk_capacity; uint32_t n_full_data_chunks; uint32_t n_empty_data_chunks; }; struct SetSamplingPeriodRequest { uint32_t new_sampling_period; }; enum SetSamplingPeriodResult { SAMPLING_PERIOD_OK, SAMPLING_PERIOD_OUT_OF_RANGE }; struct PopDataChunkRequest { }; struct DataChunk { uint64_t begin; uint32_t sampling_period; float temperature_data[]; }; template < size_t T_Capacity > struct RandomSensor { private: std::chrono::time_point<std::chrono::high_resolution_clock> online_since; std::chrono::duration<uint64_t, std::milli> period_duration; uint32_t chunk_size; uint32_t next_free; uint32_t next_full; float bat_charge; std::counting_semaphore<> sem_full; std::counting_semaphore<> sem_free; float data[ T_Capacity ]; std::thread sensor_thread; void generate_value() { sem_free.acquire(); data[ next_free ] = (float) next_free / 100.0; next_free = (next_free + 1) % T_Capacity; sem_full.release(); } public: RandomSensor() : online_since(std::chrono::high_resolution_clock::now()) , period_duration( 10 ) , chunk_size( 8192 ) , next_free( 0 ) , next_full( 0 ) , bat_charge( 8200.0 ) , sem_full( 0 ) , sem_free( T_Capacity ) , sensor_thread([&] { while(bat_charge > 0) { generate_value(); bat_charge -= 0.01; std::this_thread::sleep_for( period_duration ); } }) {} ~RandomSensor() { sensor_thread.join(); } uint32_t n_chunks_capacity() const { return T_Capacity / chunk_size; } uint32_t n_full_chunks() const { return (( next_free - next_full ) % T_Capacity) / chunk_size; } uint32_t n_empty_chunks() const { return n_chunks_capacity() - n_full_chunks(); } void get_status( StatusRequest const * request, SensorStatus * status ) { status->online_since = std::chrono::duration_cast<std::chrono::milliseconds>(online_since.time_since_epoch()).count(); status->battery_capacity = 8200; status->battery_charge = bat_charge; status->max_sampling_rate = 10; status->cur_sampling_rate = std::chrono::duration_cast<std::chrono::milliseconds>(period_duration).count(); status->cur_chunk_size = chunk_size; status->n_chunk_capacity = n_chunks_capacity(); status->n_full_data_chunks = n_full_chunks(); status->n_empty_data_chunks = n_empty_chunks(); } void set_sampling_period( SetSamplingPeriodRequest const * request, SetSamplingPeriodResult * result ) { *result = SAMPLING_PERIOD_OK; } void pop_data_chunk( PopDataChunkRequest const * request, DataChunk * data_chunk ) { } }; #include <iostream> #include <string> #include <unistd.h> #include <arpa/inet.h> #include <sys/socket.h> #include <cstring> enum RpcTag { GET_STATUS = 0, SET_SAMPLING_PERIOD, POP_DATA_CHUNK }; struct RpcService { private: uint32_t port; RandomSensor< 1048576 > & sensor; public: RpcService( uint32_t port, RandomSensor< 1048576 > & sensor ) :port(port), sensor(sensor) { } void handle_client(int client_sock) { char request_buffer[1024]; ssize_t bytes_received; // Receive data from client bytes_received = recv(client_sock, request_buffer, sizeof(request_buffer), 0); if (bytes_received == -1) { std::cerr << "Failed to receive data" << std::endl; return; } printf("received %lu bytes:\n", bytes_received); for( size_t i = 0; i < bytes_received;) { printf("%4lx : ", i); for( size_t col = 0; col < 8; ++col ) { printf("%2x ", request_buffer[i++]); } printf("\n"); } if (bytes_received >= 8 ) { RpcTag * tag = (RpcTag*) request_buffer; switch( *tag ) { case GET_STATUS: { printf("RPC call get_status()\n"); StatusRequest * request = (StatusRequest * ) (request_buffer + sizeof(RpcTag)); SensorStatus status; sensor.get_status(request, &status); send(client_sock, (char*)&status, sizeof(status), 0); break; } case SET_SAMPLING_PERIOD: { SetSamplingPeriodRequest * request = (SetSamplingPeriodRequest * ) (request_buffer + sizeof(RpcTag)); printf("RPC call set_sampling_period()\n"); SetSamplingPeriodResult result; sensor.set_sampling_period(request, &result); send(client_sock, (char*)&result, sizeof(result), 0); break; } case POP_DATA_CHUNK: { PopDataChunkRequest * request = (PopDataChunkRequest * ) (request_buffer + sizeof(RpcTag)); printf("RPC call pop_data_chunk()\n"); DataChunk data_chunk; sensor.pop_data_chunk(request, &data_chunk); send(client_sock, (char*)&data_chunk, sizeof(data_chunk), 0); break; } default: { std::string response("invalid RPC"); send(client_sock, response.c_str(), response.length(), 0); } } } } int run() { int server_sock, client_sock; struct sockaddr_in server_addr, client_addr; socklen_t addr_len = sizeof(client_addr); // Create socket server_sock = socket(AF_INET, SOCK_STREAM, 0); if (server_sock == -1) { std::cerr << "Failed to create socket" << std::endl; return 1; } // Set up server address server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = INADDR_ANY; server_addr.sin_port = htons(port); // Bind socket to the address and port if (bind(server_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) { std::cerr << "Failed to bind socket" << std::endl; return 1; } // Listen for incoming connections if (listen(server_sock, 3) == -1) { std::cerr << "Failed to listen on socket" << std::endl; return 1; } std::cout << "Server listening on port " << port << "..." << std::endl; // Accept and handle client connections while (true) { client_sock = accept(server_sock, (struct sockaddr*)&client_addr, &addr_len); if (client_sock == -1) { std::cerr << "Failed to accept connection" << std::endl; continue; } std::cout << "Client connected!" << std::endl; handle_client(client_sock); // Close the client socket after handling the request close(client_sock); } // Close the server socket close(server_sock); } }; int main( int argc, char* argv[] ) { RandomSensor< 1048576 > sensor; RpcService service(8070, sensor); return service.run(); }