#include <iostream> #include <chrono> #include <cmath> #include <random> #include <semaphore> #include <google/protobuf/arena.h> #include <thread> #include <grpc/grpc.h> #include <grpcpp/security/server_credentials.h> #include <grpcpp/server.h> #include <grpcpp/server_builder.h> #include <grpcpp/server_context.h> #include "sensor.pb.h" #include "sensor.grpc.pb.h" template < size_t T_Capacity > struct RandomSensor : public Sensor::Service { private: google::protobuf::Arena arena; std::chrono::time_point<std::chrono::high_resolution_clock> online_since; std::chrono::duration<uint64_t, std::milli> period_duration; uint32_t chunk_size; uint32_t next_free; uint32_t next_full; float bat_charge; std::counting_semaphore<> sem_full; std::counting_semaphore<> sem_free; float data[ T_Capacity ]; std::thread sensor_thread; void generate_value() { sem_free.acquire(); data[ next_free ] = (float) next_free / 100.0; next_free = (next_free + 1) % T_Capacity; sem_full.release(); } public: RandomSensor() : online_since(std::chrono::high_resolution_clock::now()) , period_duration( 10 ) , chunk_size( 8192 ) , next_free( 0 ) , next_full( 0 ) , bat_charge( 8200.0 ) , sem_full( 0 ) , sem_free( T_Capacity ) , sensor_thread([&] { while(bat_charge > 0) { generate_value(); bat_charge -= 0.01; std::this_thread::sleep_for( period_duration ); } }) {} ~RandomSensor() { sensor_thread.join(); } uint32_t n_chunks_capacity() const { return T_Capacity / chunk_size; } uint32_t n_full_chunks() const { return (( next_free - next_full ) % T_Capacity) / chunk_size; } uint32_t n_empty_chunks() const { return n_chunks_capacity() - n_full_chunks(); } grpc::Status set_sampling_period( grpc::ServerContext * context, Duration const * new_sampling_period, SetSamplingPeriodResult * result ) override { // todo return grpc::Status::OK; } grpc::Status pop_data_chunk( grpc::ServerContext * context, DataChunkRequest const * request, DataChunk * chunk ) override { Duration * rate = google::protobuf::Arena::Create<Duration>(&arena); rate->set_millis(period_duration.count()); chunk->set_allocated_period(rate); for( uint32_t i = 0; i < chunk_size; ++i ) { Temperature * temp = chunk->add_data(); sem_full.acquire(); temp->set_celsius( data[next_full] ); next_full = (next_full + 1) % T_Capacity; sem_free.release(); } return grpc::Status::OK; } grpc::Status get_status( grpc::ServerContext * context, StatusRequest const * request, SensorStatus * status ) override { status->set_name("random sensor (protobuf)"); status->mutable_online_since()->set_millis_since_epoch( std::chrono::duration_cast<std::chrono::milliseconds>(online_since.time_since_epoch()).count() ); status->mutable_battery_capacity()->set_mah(8200); status->mutable_battery_charge()->set_mah(bat_charge); status->mutable_cur_sampling_period()->set_millis(period_duration.count()); status->mutable_max_sampling_period()->set_millis(10); status->set_max_chunk_size(T_Capacity); status->set_cur_chunk_size(chunk_size); status->set_n_chunk_capacity( n_chunks_capacity() ); status->set_n_full_data_chunks( n_full_chunks() ); status->set_n_empty_data_chunks( n_empty_chunks() ); return grpc::Status::OK; } }; int main() { RandomSensor< 1048576 > sensor; std::string server_address("0.0.0.0:50051"); grpc::ServerBuilder builder; builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.RegisterService(&sensor); std::unique_ptr<grpc::Server> server(builder.BuildAndStart()); std::cout << "Server listening on " << server_address << std::endl; server->Wait(); return 0; }