diff --git a/random-sensor-ldmc/get_status.sh b/random-sensor-ldmc/get_status.sh new file mode 100755 index 0000000..8acbaaa --- /dev/null +++ b/random-sensor-ldmc/get_status.sh @@ -0,0 +1,25 @@ +#!/bin/sh + + RPC_HOST="localhost" + RPC_PORT="8070" + RPC_TAG="0" + + echo -n ${RPC_TAG} \ + | morph " RpcTag ~ ℕ + ~ <PosInt 10 BigEndian> + ~ < Seq~<ValueTerminated 0> + <Digit 10> + ~ Char ~ Ascii ~ x86.UInt8 + >" \ + " RpcTag ~ ℕ + ~ x86.UInt64" \ + | nc ${RPC_HOST} ${RPC_PORT} \ + | morph " TimePoint + ~ <TimeSince UnixEpoch> + ~ Duration ~ Seconds + ~ ℝ ~ <QuantizedLinear 0 1 1000> + ~ ℕ ~ x86.UInt64" \ + " TimePoint + ~ ISO8601 + ~ <Seq Char~Unicode> ~ UTF-8 + ~ <Seq~<ValueTerminated 0> x86.UInt8>" diff --git a/random-sensor-ldmc/meson.build b/random-sensor-ldmc/meson.build new file mode 100644 index 0000000..935f67a --- /dev/null +++ b/random-sensor-ldmc/meson.build @@ -0,0 +1,6 @@ +project('random-sensor-ldmc', 'cpp', +default_options: ['cpp_std=c++20']) + +executable('random-sensor-ldmc', + ['random-sensor.cpp'] +) diff --git a/random-sensor-ldmc/random-sensor.cpp b/random-sensor-ldmc/random-sensor.cpp new file mode 100644 index 0000000..91a73ea --- /dev/null +++ b/random-sensor-ldmc/random-sensor.cpp @@ -0,0 +1,283 @@ +#include <iostream> +#include <chrono> +#include <cmath> +#include <random> +#include <semaphore> +#include <google/protobuf/arena.h> +#include <thread> + + +struct StatusRequest { +}; +struct SensorStatus { + uint64_t online_since; + uint32_t battery_charge; + uint32_t battery_capacity; + uint32_t max_sampling_rate; + uint32_t cur_sampling_rate; + uint32_t max_chunk_size; + uint32_t cur_chunk_size; + uint32_t n_chunk_capacity; + uint32_t n_full_data_chunks; + uint32_t n_empty_data_chunks; +}; + + +struct SetSamplingPeriodRequest { + uint32_t new_sampling_period; +}; +enum SetSamplingPeriodResult { + SAMPLING_PERIOD_OK, + SAMPLING_PERIOD_OUT_OF_RANGE +}; + +struct PopDataChunkRequest { +}; +struct DataChunk { + uint64_t begin; + uint32_t sampling_period; + float temperature_data[]; +}; + +template < size_t T_Capacity > +struct RandomSensor +{ +private: + std::chrono::time_point<std::chrono::high_resolution_clock> online_since; + std::chrono::duration<uint64_t, std::milli> period_duration; + uint32_t chunk_size; + uint32_t next_free; + uint32_t next_full; + + float bat_charge; + + std::counting_semaphore<> sem_full; + std::counting_semaphore<> sem_free; + + float data[ T_Capacity ]; + std::thread sensor_thread; + + void generate_value() + { + sem_free.acquire(); + + data[ next_free ] = (float) next_free / 100.0; + next_free = (next_free + 1) % T_Capacity; + + sem_full.release(); + } + +public: + RandomSensor() + : online_since(std::chrono::high_resolution_clock::now()) + , period_duration( 10 ) + , chunk_size( 8192 ) + , next_free( 0 ) + , next_full( 0 ) + , bat_charge( 8200.0 ) + , sem_full( 0 ) + , sem_free( T_Capacity ) + , sensor_thread([&] { + while(bat_charge > 0) { + generate_value(); + bat_charge -= 0.01; + std::this_thread::sleep_for( period_duration ); + } + }) + {} + + ~RandomSensor() { + sensor_thread.join(); + } + + uint32_t n_chunks_capacity() const { + return T_Capacity / chunk_size; + } + + uint32_t n_full_chunks() const { + return (( next_free - next_full ) % T_Capacity) / chunk_size; + } + + uint32_t n_empty_chunks() const { + return n_chunks_capacity() - n_full_chunks(); + } + + + void get_status( + StatusRequest const * request, + SensorStatus * status + ) { + status->online_since = std::chrono::duration_cast<std::chrono::milliseconds>(online_since.time_since_epoch()).count(); + status->battery_capacity = 8200; + status->battery_charge = bat_charge; + status->max_sampling_rate = 10; + status->cur_sampling_rate = std::chrono::duration_cast<std::chrono::milliseconds>(period_duration).count(); + status->cur_chunk_size = chunk_size; + status->n_chunk_capacity = n_chunks_capacity(); + status->n_full_data_chunks = n_full_chunks(); + status->n_empty_data_chunks = n_empty_chunks(); + } + + void set_sampling_period( + SetSamplingPeriodRequest const * request, + SetSamplingPeriodResult * result + ) { + *result = SAMPLING_PERIOD_OK; + } + + void pop_data_chunk( + PopDataChunkRequest const * request, + DataChunk * data_chunk + ) { + + } +}; + +#include <iostream> +#include <string> +#include <unistd.h> +#include <arpa/inet.h> +#include <sys/socket.h> +#include <cstring> + + +enum RpcTag { + GET_STATUS = 0, + SET_SAMPLING_PERIOD, + POP_DATA_CHUNK +}; + +struct RpcService { +private: + uint32_t port; + RandomSensor< 1048576 > & sensor; + +public: + RpcService( + uint32_t port, + RandomSensor< 1048576 > & sensor + ) :port(port), sensor(sensor) + { + } + + void handle_client(int client_sock) { + char request_buffer[1024]; + ssize_t bytes_received; + + // Receive data from client + bytes_received = recv(client_sock, request_buffer, sizeof(request_buffer), 0); + if (bytes_received == -1) { + std::cerr << "Failed to receive data" << std::endl; + return; + } + + printf("received %lu bytes:\n", bytes_received); + for( size_t i = 0; i < bytes_received;) + { + printf("%4lx : ", i); + for( size_t col = 0; col < 8; ++col ) { + printf("%2x ", request_buffer[i++]); + } + printf("\n"); + } + + if (bytes_received >= 8 ) { + RpcTag * tag = (RpcTag*) request_buffer; + switch( *tag ) { + case GET_STATUS: { + printf("RPC call get_status()\n"); + StatusRequest * request = (StatusRequest * ) (request_buffer + sizeof(RpcTag)); + + SensorStatus status; + sensor.get_status(request, &status); + + send(client_sock, (char*)&status, sizeof(status), 0); + break; + } + + case SET_SAMPLING_PERIOD: { + SetSamplingPeriodRequest * request = (SetSamplingPeriodRequest * ) (request_buffer + sizeof(RpcTag)); + printf("RPC call set_sampling_period()\n"); + + SetSamplingPeriodResult result; + sensor.set_sampling_period(request, &result); + + send(client_sock, (char*)&result, sizeof(result), 0); + break; + } + + case POP_DATA_CHUNK: { + PopDataChunkRequest * request = (PopDataChunkRequest * ) (request_buffer + sizeof(RpcTag)); + printf("RPC call pop_data_chunk()\n"); + + DataChunk data_chunk; + sensor.pop_data_chunk(request, &data_chunk); + + send(client_sock, (char*)&data_chunk, sizeof(data_chunk), 0); + break; + } + + default: { + std::string response("invalid RPC"); + send(client_sock, response.c_str(), response.length(), 0); + } + } + } + } + + int run() { + int server_sock, client_sock; + struct sockaddr_in server_addr, client_addr; + socklen_t addr_len = sizeof(client_addr); + + // Create socket + server_sock = socket(AF_INET, SOCK_STREAM, 0); + if (server_sock == -1) { + std::cerr << "Failed to create socket" << std::endl; + return 1; + } + + // Set up server address + server_addr.sin_family = AF_INET; + server_addr.sin_addr.s_addr = INADDR_ANY; + server_addr.sin_port = htons(port); + + // Bind socket to the address and port + if (bind(server_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) { + std::cerr << "Failed to bind socket" << std::endl; + return 1; + } + + // Listen for incoming connections + if (listen(server_sock, 3) == -1) { + std::cerr << "Failed to listen on socket" << std::endl; + return 1; + } + std::cout << "Server listening on port " << port << "..." << std::endl; + + // Accept and handle client connections + while (true) { + client_sock = accept(server_sock, (struct sockaddr*)&client_addr, &addr_len); + if (client_sock == -1) { + std::cerr << "Failed to accept connection" << std::endl; + continue; + } + + std::cout << "Client connected!" << std::endl; + handle_client(client_sock); + + // Close the client socket after handling the request + close(client_sock); + } + + // Close the server socket + close(server_sock); + } +}; + +int main( int argc, char* argv[] ) { + RandomSensor< 1048576 > sensor; + + RpcService service(8070, sensor); + return service.run(); +} diff --git a/random-sensor-ldmc/sensor-internal.lt b/random-sensor-ldmc/sensor-internal.lt new file mode 100644 index 0000000..0afb750 --- /dev/null +++ b/random-sensor-ldmc/sensor-internal.lt @@ -0,0 +1,30 @@ +include "../sensor.lt"; + +type native.SensorStatus = SensorStatus ~ { + online_since : TimePoint ~ <TimeSince UnixEpoch> ~ Milliseconds ~ ℤ ~ native.UInt64 ; + battery_charge : Energy ~ mAh ~ native.UInt32 ; + battery_capacity : Energy ~ mAh ~ native.UInt32 ; + min_sampling_period : Duration ~ Milliseconds ~ native.UInt32 ; + cur_sampling_period : Duration ~ Milliseconds ~ native.UInt32 ; + max_chunk_size : ℤ ~ native.UInt32 ; + cur_chunk_size : ℤ ~ native.UInt32 ; + n_chunk_capacity : ℤ ~ native.UInt32 ; + n_full_data_chunks : ℤ ~ native.UInt32 ; + n_empty_data_chunks : ℤ ~ native.UInt32 ; +} ; + +type native.DataChunk = DataChunk ~ { + begin : TimePoint ~ <TimeSince UnixEpoch> ~ Milliseconds ~ native.UInt64 ; + data : [~<LengthPrefix x86.UInt32> + Temperature + ~ Celsius + ~ ℝ + ~ native.Float64 + ] ; +} ; + +type native.Sensor = Sensor ~ { + get_status : {} -> InternSensorStatus ; + set_sampling_period : Duration~Milliseconds~UInt32 -> (Ok | OutOfRange)~Byte ; + pop_data_chunk : {} -> InternDataChunk ; +} ; diff --git a/sensor.lt b/sensor.lt index 7172b65..5a6a84d 100644 --- a/sensor.lt +++ b/sensor.lt @@ -15,6 +15,7 @@ trait SensorStatus = { trait DataChunk = { begin : TimePoint ; + sampling_period: Duration ; data : [ Temperature ] ; }