refactor RpcService class
This commit is contained in:
parent
43b5f2ffb6
commit
82cd11a4af
2 changed files with 148 additions and 154 deletions
random-sensor-ldmc
|
@ -5,7 +5,9 @@
|
||||||
#include <semaphore>
|
#include <semaphore>
|
||||||
#include <google/protobuf/arena.h>
|
#include <google/protobuf/arena.h>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
#include <map>
|
||||||
|
|
||||||
|
#include "rpc_service.hpp"
|
||||||
|
|
||||||
struct StatusRequest {
|
struct StatusRequest {
|
||||||
};
|
};
|
||||||
|
@ -104,180 +106,57 @@ public:
|
||||||
|
|
||||||
|
|
||||||
void get_status(
|
void get_status(
|
||||||
StatusRequest const * request,
|
StatusRequest const & request,
|
||||||
SensorStatus * status
|
SensorStatus & status
|
||||||
) {
|
) {
|
||||||
status->online_since = std::chrono::duration_cast<std::chrono::milliseconds>(online_since.time_since_epoch()).count();
|
status.online_since = std::chrono::duration_cast<std::chrono::milliseconds>(online_since.time_since_epoch()).count();
|
||||||
status->battery_capacity = 8200;
|
status.battery_capacity = 8200;
|
||||||
status->battery_charge = bat_charge;
|
status.battery_charge = bat_charge;
|
||||||
status->max_sampling_rate = 10;
|
status.max_sampling_rate = 10;
|
||||||
status->cur_sampling_rate = std::chrono::duration_cast<std::chrono::milliseconds>(period_duration).count();
|
status.cur_sampling_rate = std::chrono::duration_cast<std::chrono::milliseconds>(period_duration).count();
|
||||||
status->cur_chunk_size = chunk_size;
|
status.cur_chunk_size = chunk_size;
|
||||||
status->n_chunk_capacity = n_chunks_capacity();
|
status.n_chunk_capacity = n_chunks_capacity();
|
||||||
status->n_full_data_chunks = n_full_chunks();
|
status.n_full_data_chunks = n_full_chunks();
|
||||||
status->n_empty_data_chunks = n_empty_chunks();
|
status.n_empty_data_chunks = n_empty_chunks();
|
||||||
}
|
}
|
||||||
|
|
||||||
void set_sampling_period(
|
void set_sampling_period(
|
||||||
SetSamplingPeriodRequest const * request,
|
SetSamplingPeriodRequest const & request,
|
||||||
SetSamplingPeriodResult * result
|
SetSamplingPeriodResult & result
|
||||||
) {
|
) {
|
||||||
*result = SAMPLING_PERIOD_OK;
|
result = SAMPLING_PERIOD_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
void pop_data_chunk(
|
void pop_data_chunk(
|
||||||
PopDataChunkRequest const * request,
|
PopDataChunkRequest const & request,
|
||||||
DataChunk * data_chunk
|
DataChunk & data_chunk
|
||||||
) {
|
) {
|
||||||
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
#include <iostream>
|
enum SensorRpcTag {
|
||||||
#include <string>
|
|
||||||
#include <unistd.h>
|
|
||||||
#include <arpa/inet.h>
|
|
||||||
#include <sys/socket.h>
|
|
||||||
#include <cstring>
|
|
||||||
|
|
||||||
|
|
||||||
enum RpcTag {
|
|
||||||
GET_STATUS = 0,
|
GET_STATUS = 0,
|
||||||
SET_SAMPLING_PERIOD,
|
SET_SAMPLING_PERIOD,
|
||||||
POP_DATA_CHUNK
|
POP_DATA_CHUNK
|
||||||
};
|
};
|
||||||
|
|
||||||
struct RpcService {
|
|
||||||
private:
|
|
||||||
uint32_t port;
|
|
||||||
RandomSensor< 1048576 > & sensor;
|
|
||||||
|
|
||||||
public:
|
|
||||||
RpcService(
|
|
||||||
uint32_t port,
|
|
||||||
RandomSensor< 1048576 > & sensor
|
|
||||||
) :port(port), sensor(sensor)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void handle_client(int client_sock) {
|
|
||||||
char request_buffer[1024];
|
|
||||||
ssize_t bytes_received;
|
|
||||||
|
|
||||||
// Receive data from client
|
|
||||||
bytes_received = recv(client_sock, request_buffer, sizeof(request_buffer), 0);
|
|
||||||
if (bytes_received == -1) {
|
|
||||||
std::cerr << "Failed to receive data" << std::endl;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
printf("received %lu bytes:\n", bytes_received);
|
|
||||||
for( size_t i = 0; i < bytes_received;)
|
|
||||||
{
|
|
||||||
printf("%4lx : ", i);
|
|
||||||
for( size_t col = 0; col < 8; ++col ) {
|
|
||||||
printf("%2x ", request_buffer[i++]);
|
|
||||||
}
|
|
||||||
printf("\n");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (bytes_received >= 8 ) {
|
|
||||||
RpcTag * tag = (RpcTag*) request_buffer;
|
|
||||||
switch( *tag ) {
|
|
||||||
case GET_STATUS: {
|
|
||||||
printf("RPC call get_status()\n");
|
|
||||||
StatusRequest * request = (StatusRequest * ) (request_buffer + sizeof(RpcTag));
|
|
||||||
|
|
||||||
SensorStatus status;
|
|
||||||
sensor.get_status(request, &status);
|
|
||||||
|
|
||||||
send(client_sock, (char*)&status, sizeof(status), 0);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case SET_SAMPLING_PERIOD: {
|
|
||||||
SetSamplingPeriodRequest * request = (SetSamplingPeriodRequest * ) (request_buffer + sizeof(RpcTag));
|
|
||||||
printf("RPC call set_sampling_period()\n");
|
|
||||||
|
|
||||||
SetSamplingPeriodResult result;
|
|
||||||
sensor.set_sampling_period(request, &result);
|
|
||||||
|
|
||||||
send(client_sock, (char*)&result, sizeof(result), 0);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case POP_DATA_CHUNK: {
|
|
||||||
PopDataChunkRequest * request = (PopDataChunkRequest * ) (request_buffer + sizeof(RpcTag));
|
|
||||||
printf("RPC call pop_data_chunk()\n");
|
|
||||||
|
|
||||||
DataChunk data_chunk;
|
|
||||||
sensor.pop_data_chunk(request, &data_chunk);
|
|
||||||
|
|
||||||
send(client_sock, (char*)&data_chunk, sizeof(data_chunk), 0);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
default: {
|
|
||||||
std::string response("invalid RPC");
|
|
||||||
send(client_sock, response.c_str(), response.length(), 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int run() {
|
|
||||||
int server_sock, client_sock;
|
|
||||||
struct sockaddr_in server_addr, client_addr;
|
|
||||||
socklen_t addr_len = sizeof(client_addr);
|
|
||||||
|
|
||||||
// Create socket
|
|
||||||
server_sock = socket(AF_INET, SOCK_STREAM, 0);
|
|
||||||
if (server_sock == -1) {
|
|
||||||
std::cerr << "Failed to create socket" << std::endl;
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set up server address
|
|
||||||
server_addr.sin_family = AF_INET;
|
|
||||||
server_addr.sin_addr.s_addr = INADDR_ANY;
|
|
||||||
server_addr.sin_port = htons(port);
|
|
||||||
|
|
||||||
// Bind socket to the address and port
|
|
||||||
if (bind(server_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
|
|
||||||
std::cerr << "Failed to bind socket" << std::endl;
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Listen for incoming connections
|
|
||||||
if (listen(server_sock, 3) == -1) {
|
|
||||||
std::cerr << "Failed to listen on socket" << std::endl;
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
std::cout << "Server listening on port " << port << "..." << std::endl;
|
|
||||||
|
|
||||||
// Accept and handle client connections
|
|
||||||
while (true) {
|
|
||||||
client_sock = accept(server_sock, (struct sockaddr*)&client_addr, &addr_len);
|
|
||||||
if (client_sock == -1) {
|
|
||||||
std::cerr << "Failed to accept connection" << std::endl;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::cout << "Client connected!" << std::endl;
|
|
||||||
handle_client(client_sock);
|
|
||||||
|
|
||||||
// Close the client socket after handling the request
|
|
||||||
close(client_sock);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close the server socket
|
|
||||||
close(server_sock);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
int main( int argc, char* argv[] ) {
|
int main( int argc, char* argv[] ) {
|
||||||
RandomSensor< 1048576 > sensor;
|
RandomSensor< 1048576 > sensor;
|
||||||
|
|
||||||
RpcService service(8070, sensor);
|
RpcService<SensorRpcTag> service(8070);
|
||||||
|
service.register_handler( GET_STATUS,
|
||||||
|
std::function([&sensor]( StatusRequest const & request, SensorStatus & status ) {
|
||||||
|
sensor.get_status(request, status);
|
||||||
|
}));
|
||||||
|
service.register_handler( SET_SAMPLING_PERIOD,
|
||||||
|
std::function([&sensor]( SetSamplingPeriodRequest const & request, SetSamplingPeriodResult & result ) {
|
||||||
|
sensor.set_sampling_period(request, result);
|
||||||
|
}));
|
||||||
|
service.register_handler( POP_DATA_CHUNK,
|
||||||
|
std::function([&sensor]( PopDataChunkRequest const & request, DataChunk & result ) {
|
||||||
|
sensor.pop_data_chunk(request, result);
|
||||||
|
}));
|
||||||
|
|
||||||
return service.run();
|
return service.run();
|
||||||
}
|
}
|
||||||
|
|
115
random-sensor-ldmc/rpc_service.hpp
Normal file
115
random-sensor-ldmc/rpc_service.hpp
Normal file
|
@ -0,0 +1,115 @@
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <iostream>
|
||||||
|
#include <string>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <arpa/inet.h>
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <cstring>
|
||||||
|
#include <map>
|
||||||
|
#include <functional>
|
||||||
|
|
||||||
|
template < typename T_RpcTag >
|
||||||
|
struct RpcService {
|
||||||
|
private:
|
||||||
|
uint32_t port;
|
||||||
|
std::map< T_RpcTag, std::function<void( char const * input, size_t input_len, int client_sock )> > handler;
|
||||||
|
|
||||||
|
public:
|
||||||
|
RpcService(uint32_t port) :port(port)
|
||||||
|
{}
|
||||||
|
|
||||||
|
template < typename F, typename Request, typename Response >
|
||||||
|
void register_handler(T_RpcTag tag, std::function<F(Request const&, Response&)> f) {
|
||||||
|
handler.emplace(
|
||||||
|
tag,
|
||||||
|
std::function([f]( char const * request_buffer, size_t len, int client_sock ) {
|
||||||
|
Request * request = (Request*) (request_buffer + sizeof(T_RpcTag));
|
||||||
|
Response response;
|
||||||
|
f( *request, response );
|
||||||
|
send(client_sock, (void const*)&response, sizeof(response), 0);
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
void handle_client(int client_sock) {
|
||||||
|
char request_buffer[1024];
|
||||||
|
ssize_t bytes_received;
|
||||||
|
|
||||||
|
// Receive data from client
|
||||||
|
bytes_received = recv(client_sock, request_buffer, sizeof(request_buffer), 0);
|
||||||
|
if (bytes_received == -1) {
|
||||||
|
std::cerr << "Failed to receive data" << std::endl;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("received %lu bytes:\n", bytes_received);
|
||||||
|
for( size_t i = 0; i < bytes_received;)
|
||||||
|
{
|
||||||
|
printf("%4lx : ", i);
|
||||||
|
for( size_t col = 0; col < 8; ++col ) {
|
||||||
|
printf("%2x ", request_buffer[i++]);
|
||||||
|
}
|
||||||
|
printf("\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (bytes_received >= sizeof(T_RpcTag) ) {
|
||||||
|
T_RpcTag * tag = (T_RpcTag*) request_buffer;
|
||||||
|
if( handler.contains(*tag) ){
|
||||||
|
(handler[*tag])( request_buffer, bytes_received, client_sock );
|
||||||
|
} else {
|
||||||
|
std::string response("invalid RPC");
|
||||||
|
send(client_sock, response.c_str(), response.length(), 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int run() {
|
||||||
|
int server_sock, client_sock;
|
||||||
|
struct sockaddr_in server_addr, client_addr;
|
||||||
|
socklen_t addr_len = sizeof(client_addr);
|
||||||
|
|
||||||
|
// Create socket
|
||||||
|
server_sock = socket(AF_INET, SOCK_STREAM, 0);
|
||||||
|
if (server_sock == -1) {
|
||||||
|
std::cerr << "Failed to create socket" << std::endl;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set up server address
|
||||||
|
server_addr.sin_family = AF_INET;
|
||||||
|
server_addr.sin_addr.s_addr = INADDR_ANY;
|
||||||
|
server_addr.sin_port = htons(port);
|
||||||
|
|
||||||
|
// Bind socket to the address and port
|
||||||
|
if (bind(server_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
|
||||||
|
std::cerr << "Failed to bind socket" << std::endl;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Listen for incoming connections
|
||||||
|
if (listen(server_sock, 3) == -1) {
|
||||||
|
std::cerr << "Failed to listen on socket" << std::endl;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
std::cout << "Server listening on port " << port << "..." << std::endl;
|
||||||
|
|
||||||
|
// Accept and handle client connections
|
||||||
|
while (true) {
|
||||||
|
client_sock = accept(server_sock, (struct sockaddr*)&client_addr, &addr_len);
|
||||||
|
if (client_sock == -1) {
|
||||||
|
std::cerr << "Failed to accept connection" << std::endl;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::cout << "Client connected!" << std::endl;
|
||||||
|
handle_client(client_sock);
|
||||||
|
|
||||||
|
// Close the client socket after handling the request
|
||||||
|
close(client_sock);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the server socket
|
||||||
|
close(server_sock);
|
||||||
|
}
|
||||||
|
};
|
Loading…
Add table
Reference in a new issue