From 76e33698445629a9d797a4de522e8942d936bfa3 Mon Sep 17 00:00:00 2001
From: Michael Sippel <micha@fragmental.art>
Date: Wed, 19 Mar 2025 17:29:57 +0100
Subject: [PATCH] add initial ldmc variant for random-sensor

---
 random-sensor-ldmc/get_status.sh      |  25 +++
 random-sensor-ldmc/meson.build        |   6 +
 random-sensor-ldmc/random-sensor.cpp  | 283 ++++++++++++++++++++++++++
 random-sensor-ldmc/sensor-internal.lt |  30 +++
 sensor.lt                             |   1 +
 5 files changed, 345 insertions(+)
 create mode 100755 random-sensor-ldmc/get_status.sh
 create mode 100644 random-sensor-ldmc/meson.build
 create mode 100644 random-sensor-ldmc/random-sensor.cpp
 create mode 100644 random-sensor-ldmc/sensor-internal.lt

diff --git a/random-sensor-ldmc/get_status.sh b/random-sensor-ldmc/get_status.sh
new file mode 100755
index 0000000..8acbaaa
--- /dev/null
+++ b/random-sensor-ldmc/get_status.sh
@@ -0,0 +1,25 @@
+#!/bin/sh
+
+    RPC_HOST="localhost"
+    RPC_PORT="8070"
+    RPC_TAG="0"
+
+    echo -n ${RPC_TAG} \
+    | morph " RpcTag ~ ℕ
+         ~ <PosInt 10 BigEndian>
+         ~ < Seq~<ValueTerminated 0>
+               <Digit 10>
+             ~ Char ~ Ascii ~ x86.UInt8
+           >"  \
+        "  RpcTag ~ ℕ
+         ~ x86.UInt64" \
+    | nc ${RPC_HOST} ${RPC_PORT} \
+    | morph "  TimePoint
+         ~ <TimeSince UnixEpoch>
+         ~ Duration ~ Seconds
+         ~ ℝ ~ <QuantizedLinear 0 1 1000>
+         ~ ℕ ~ x86.UInt64" \
+        "  TimePoint
+         ~ ISO8601
+         ~ <Seq Char~Unicode> ~ UTF-8
+         ~ <Seq~<ValueTerminated 0> x86.UInt8>"
diff --git a/random-sensor-ldmc/meson.build b/random-sensor-ldmc/meson.build
new file mode 100644
index 0000000..935f67a
--- /dev/null
+++ b/random-sensor-ldmc/meson.build
@@ -0,0 +1,6 @@
+project('random-sensor-ldmc', 'cpp',
+default_options: ['cpp_std=c++20'])
+
+executable('random-sensor-ldmc',
+    ['random-sensor.cpp']
+)
diff --git a/random-sensor-ldmc/random-sensor.cpp b/random-sensor-ldmc/random-sensor.cpp
new file mode 100644
index 0000000..91a73ea
--- /dev/null
+++ b/random-sensor-ldmc/random-sensor.cpp
@@ -0,0 +1,283 @@
+#include <iostream>
+#include <chrono>
+#include <cmath>
+#include <random>
+#include <semaphore>
+#include <google/protobuf/arena.h>
+#include <thread>
+
+
+struct StatusRequest {
+};
+struct SensorStatus {
+    uint64_t online_since;
+    uint32_t battery_charge;
+    uint32_t battery_capacity;
+    uint32_t max_sampling_rate;
+    uint32_t cur_sampling_rate;
+    uint32_t max_chunk_size;
+    uint32_t cur_chunk_size;
+    uint32_t n_chunk_capacity;
+    uint32_t n_full_data_chunks;
+    uint32_t n_empty_data_chunks;
+};
+
+
+struct SetSamplingPeriodRequest {
+    uint32_t new_sampling_period;
+};
+enum SetSamplingPeriodResult {
+    SAMPLING_PERIOD_OK,
+    SAMPLING_PERIOD_OUT_OF_RANGE
+};
+
+struct PopDataChunkRequest {
+};
+struct DataChunk {
+    uint64_t begin;
+    uint32_t sampling_period;
+    float temperature_data[];
+};
+
+template < size_t T_Capacity >
+struct RandomSensor
+{
+private:
+    std::chrono::time_point<std::chrono::high_resolution_clock> online_since;
+    std::chrono::duration<uint64_t, std::milli> period_duration;
+    uint32_t chunk_size;
+    uint32_t next_free;
+    uint32_t next_full;
+
+    float bat_charge;
+
+    std::counting_semaphore<> sem_full;
+    std::counting_semaphore<> sem_free;
+
+    float data[ T_Capacity ];
+    std::thread sensor_thread;
+
+    void generate_value()
+    {
+        sem_free.acquire();
+
+        data[ next_free ] = (float) next_free / 100.0;
+        next_free = (next_free + 1) % T_Capacity;
+
+        sem_full.release();
+    }
+
+public:
+    RandomSensor()
+        : online_since(std::chrono::high_resolution_clock::now())
+        , period_duration( 10 )
+        , chunk_size( 8192 )
+        , next_free( 0 )
+        , next_full( 0 )
+        , bat_charge( 8200.0 )
+        , sem_full( 0 )
+        , sem_free( T_Capacity )
+        , sensor_thread([&] {
+            while(bat_charge > 0) {
+                generate_value();
+                bat_charge -= 0.01;
+                std::this_thread::sleep_for( period_duration );
+            }
+        })
+    {}
+
+    ~RandomSensor() {
+        sensor_thread.join();
+    }
+
+    uint32_t n_chunks_capacity() const {
+        return T_Capacity / chunk_size;
+    }
+
+    uint32_t n_full_chunks() const {
+        return (( next_free - next_full ) % T_Capacity) / chunk_size;
+    }
+
+    uint32_t n_empty_chunks() const {
+        return n_chunks_capacity() - n_full_chunks();
+    }
+
+
+    void get_status(
+        StatusRequest const * request,
+        SensorStatus * status
+    ) {
+        status->online_since = std::chrono::duration_cast<std::chrono::milliseconds>(online_since.time_since_epoch()).count();
+        status->battery_capacity = 8200;
+        status->battery_charge = bat_charge;
+        status->max_sampling_rate = 10;
+        status->cur_sampling_rate = std::chrono::duration_cast<std::chrono::milliseconds>(period_duration).count();
+        status->cur_chunk_size = chunk_size;
+        status->n_chunk_capacity = n_chunks_capacity();
+        status->n_full_data_chunks = n_full_chunks();
+        status->n_empty_data_chunks = n_empty_chunks();
+    }
+
+    void set_sampling_period(
+        SetSamplingPeriodRequest const * request,
+        SetSamplingPeriodResult * result
+    ) {
+        *result = SAMPLING_PERIOD_OK;
+    }
+
+    void pop_data_chunk(
+        PopDataChunkRequest const * request,
+        DataChunk * data_chunk
+    ) {
+
+    }
+};
+
+#include <iostream>
+#include <string>
+#include <unistd.h>
+#include <arpa/inet.h>
+#include <sys/socket.h>
+#include <cstring>
+
+
+enum RpcTag {
+    GET_STATUS = 0,
+    SET_SAMPLING_PERIOD,
+    POP_DATA_CHUNK
+};
+
+struct RpcService {
+private:
+    uint32_t port;
+    RandomSensor< 1048576 > & sensor;
+
+public:
+    RpcService(
+        uint32_t port,
+        RandomSensor< 1048576 > & sensor
+    ) :port(port), sensor(sensor)
+    {
+    }
+
+    void handle_client(int client_sock) {
+        char request_buffer[1024];
+        ssize_t bytes_received;
+
+        // Receive data from client
+        bytes_received = recv(client_sock, request_buffer, sizeof(request_buffer), 0);
+        if (bytes_received == -1) {
+            std::cerr << "Failed to receive data" << std::endl;
+            return;
+        }
+
+        printf("received %lu bytes:\n", bytes_received);
+        for( size_t i = 0; i < bytes_received;)
+        {
+            printf("%4lx : ", i);
+            for( size_t col = 0; col < 8; ++col ) {
+                printf("%2x ", request_buffer[i++]);
+            }
+            printf("\n");
+        }
+
+        if (bytes_received >= 8 ) {
+            RpcTag * tag = (RpcTag*) request_buffer;
+            switch( *tag ) {
+                case GET_STATUS: {
+                    printf("RPC call get_status()\n");
+                    StatusRequest * request = (StatusRequest * ) (request_buffer + sizeof(RpcTag));
+
+                    SensorStatus status;
+                    sensor.get_status(request, &status);
+
+                    send(client_sock, (char*)&status, sizeof(status), 0);
+                    break;
+                }
+
+                case SET_SAMPLING_PERIOD: {
+                    SetSamplingPeriodRequest * request = (SetSamplingPeriodRequest * ) (request_buffer + sizeof(RpcTag));
+                    printf("RPC call set_sampling_period()\n");
+
+                    SetSamplingPeriodResult result;
+                    sensor.set_sampling_period(request, &result);
+
+                    send(client_sock, (char*)&result, sizeof(result), 0);
+                    break;
+                }
+
+                case POP_DATA_CHUNK: {
+                    PopDataChunkRequest * request = (PopDataChunkRequest * ) (request_buffer + sizeof(RpcTag));
+                    printf("RPC call pop_data_chunk()\n");
+
+                    DataChunk data_chunk;
+                    sensor.pop_data_chunk(request, &data_chunk);
+
+                    send(client_sock, (char*)&data_chunk, sizeof(data_chunk), 0);
+                    break;
+                }
+
+                default: {
+                    std::string response("invalid RPC");
+                    send(client_sock, response.c_str(), response.length(), 0);
+                }
+            }
+        }
+    }
+
+    int run() {
+        int server_sock, client_sock;
+            struct sockaddr_in server_addr, client_addr;
+            socklen_t addr_len = sizeof(client_addr);
+
+            // Create socket
+            server_sock = socket(AF_INET, SOCK_STREAM, 0);
+            if (server_sock == -1) {
+                std::cerr << "Failed to create socket" << std::endl;
+                return 1;
+            }
+
+            // Set up server address
+            server_addr.sin_family = AF_INET;
+            server_addr.sin_addr.s_addr = INADDR_ANY;
+            server_addr.sin_port = htons(port);
+
+            // Bind socket to the address and port
+            if (bind(server_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) {
+                std::cerr << "Failed to bind socket" << std::endl;
+                return 1;
+            }
+
+            // Listen for incoming connections
+            if (listen(server_sock, 3) == -1) {
+                std::cerr << "Failed to listen on socket" << std::endl;
+                return 1;
+            }
+            std::cout << "Server listening on port " << port << "..." << std::endl;
+
+            // Accept and handle client connections
+            while (true) {
+                client_sock = accept(server_sock, (struct sockaddr*)&client_addr, &addr_len);
+                if (client_sock == -1) {
+                    std::cerr << "Failed to accept connection" << std::endl;
+                    continue;
+                }
+
+                std::cout << "Client connected!" << std::endl;
+                handle_client(client_sock);
+
+                // Close the client socket after handling the request
+                close(client_sock);
+            }
+
+            // Close the server socket
+            close(server_sock);
+    }
+};
+
+int main( int argc, char* argv[] ) {
+    RandomSensor< 1048576 > sensor;
+
+    RpcService service(8070, sensor);
+    return service.run();
+}
diff --git a/random-sensor-ldmc/sensor-internal.lt b/random-sensor-ldmc/sensor-internal.lt
new file mode 100644
index 0000000..0afb750
--- /dev/null
+++ b/random-sensor-ldmc/sensor-internal.lt
@@ -0,0 +1,30 @@
+include "../sensor.lt";
+
+type native.SensorStatus = SensorStatus ~ {
+    online_since : TimePoint ~ <TimeSince UnixEpoch> ~ Milliseconds ~ ℤ ~ native.UInt64 ;
+    battery_charge : Energy ~ mAh ~ native.UInt32 ;
+    battery_capacity : Energy ~ mAh ~ native.UInt32 ;
+    min_sampling_period : Duration ~ Milliseconds ~ native.UInt32 ;
+    cur_sampling_period : Duration ~ Milliseconds ~ native.UInt32 ;
+    max_chunk_size : ℤ ~ native.UInt32 ;
+    cur_chunk_size : ℤ ~ native.UInt32 ;
+    n_chunk_capacity : ℤ ~ native.UInt32 ;
+    n_full_data_chunks : ℤ ~ native.UInt32 ;
+    n_empty_data_chunks : ℤ ~ native.UInt32 ;
+} ;
+
+type native.DataChunk = DataChunk ~ {
+    begin : TimePoint ~ <TimeSince UnixEpoch> ~ Milliseconds ~ native.UInt64 ;
+    data : [~<LengthPrefix x86.UInt32>
+                  Temperature
+               ~  Celsius
+               ~  ℝ
+               ~  native.Float64
+           ] ;
+} ;
+
+type native.Sensor = Sensor ~ {
+    get_status : {} -> InternSensorStatus ;
+    set_sampling_period : Duration~Milliseconds~UInt32 -> (Ok | OutOfRange)~Byte ;
+    pop_data_chunk : {} -> InternDataChunk ;
+} ;
diff --git a/sensor.lt b/sensor.lt
index 7172b65..5a6a84d 100644
--- a/sensor.lt
+++ b/sensor.lt
@@ -15,6 +15,7 @@ trait SensorStatus = {
 
 trait DataChunk = {
     begin : TimePoint ;
+    sampling_period: Duration ;
     data : [ Temperature ] ;
 }