00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #ifndef rock_h
00026 #define rock_h
00027
00028 #ifndef _REENTRANT
00029 #define _REENTRANT
00030 #endif
00031
00032 #include "config.h"
00033 #include "datagram.h"
00034 #include "base.h"
00035 #include "data.h"
00036 #include "util.h"
00037 #include "protocol.h"
00038 #include <map>
00039 #include <vector>
00040 #include <list>
00041 #include <glib.h>
00042 #include <stdint.h>
00043 #include <stdexcept>
00044 #include <sys/types.h>
00045 #include <unistd.h>
00046
00047 using std::vector;
00048 using std::list;
00049 using std::map;
00050 using std::multimap;
00051 using std::exception;
00052
00053 namespace karoo {
00054 class rock;
00055 class rock_datagram_message;
00056
00061 class rock_datagram_reply : public pebble
00062 {
00063 private:
00069 void setAckMessage(rock_datagram_message* ack_msg);
00070
00071 protected:
00072 friend class rock;
00074 rock_datagram_message* msg;
00080 rock_datagram_message* ack_msg;
00081
00082 public:
00088 rock_datagram_reply(rock_datagram_message* msg);
00089 virtual ~rock_datagram_reply();
00090
00100 virtual void run() = 0;
00101 };
00102
00106 class rock_datagram_message : public datagram_message, public rock_data
00107 {
00108 protected:
00110 size_t pos;
00112 rock* parent;
00113
00118 rock_datagram_reply* reply;
00119
00121 text recipient_name;
00122
00132 size_t readFrom(void* data, size_t size, size_t p) const;
00133 public:
00139 rock_datagram_message(size_t alloc_size, rock* parent);
00148 rock_datagram_message(size_t alloc_size, const text& ip, int port, rock* parent);
00149 virtual ~rock_datagram_message();
00150
00156 datagram_message* clone(size_t alloc_size = 0) const;
00165 datagram_message* clone(int port, size_t alloc_size = 0) const;
00166
00170 size_t getReadPos() const { return pos; }
00171
00173 void resetReadPos(size_t pos = 0) { this->pos = pos; }
00174
00176 size_t available() { return getSize() - pos; }
00177
00186 void write(const void* data, size_t size);
00195 void read(void* data, size_t size);
00196
00203 void accomodate(size_t bytes);
00204
00205
00214 uint32_t getType();
00223 uint64_t getSequence();
00232 int32_t getSourcePort();
00241 text getName();
00242
00247 void setReply(rock_datagram_reply* reply);
00248
00253 rock_datagram_reply* getReply();
00254
00262 text getHost() const;
00267 int32_t getPort() const;
00268
00276 void run();
00277
00278
00283 void setRecipientName(const text& recipient_name) { this->recipient_name = recipient_name; }
00284
00288 text getRecipientName() const { return recipient_name; }
00289 };
00290
00300 class rock_datagram_factory : public datagram_message_factory
00301 {
00302 private:
00303 protected:
00304 rock* parent;
00305 public:
00306 rock_datagram_factory(rock* parent) { this->parent = parent; }
00307 virtual ~rock_datagram_factory() {}
00308
00313 datagram_message* create_datagram_message(size_t alloc_size) {
00314 return new rock_datagram_message(alloc_size, parent);
00315 }
00316 };
00317
00324 class rock_datagram_message_handler : public referable
00325 {
00326 protected:
00327 rock* parent;
00328 public:
00329 rock_datagram_message_handler(rock* parent) { this->parent = parent; }
00330 virtual ~rock_datagram_message_handler() {}
00331
00346 rock_datagram_message* createAckMessage(const rock_datagram_message* msg, uint64_t seq, int32_t port);
00352 void sendAck(rock_datagram_message* reply);
00362 void createAndSendAck(const rock_datagram_message* msg, uint64_t seq, int32_t port);
00370 rock_datagram_message* createReplyMessage(const rock_datagram_message* msg, int32_t port, const text& name, uint32_t type);
00376 void sendMessage(rock_datagram_message* msg);
00413 virtual bool handle(uint32_t message_type, uint64_t seq, int32_t port, const text& name, rock_datagram_message* msg) = 0;
00414 };
00415
00416 class default_rock_datagram_message_handler : public rock_datagram_message_handler
00417 {
00418 private:
00423 synchronised<uint64_t> bye_seq;
00424 friend class rock;
00425 public:
00426 default_rock_datagram_message_handler(rock* parent);
00427 virtual ~default_rock_datagram_message_handler();
00428 virtual bool handle(uint32_t message_type, uint64_t seq, int32_t port, const text& name, rock_datagram_message* msg);
00429 };
00430
00431 class message_handler_runner : public pebble
00432 {
00433 protected:
00434 rock* parent;
00435 rock_datagram_message* msg;
00436 uint32_t msg_type;
00437 uint64_t seq;
00438 int32_t port;
00439 text name;
00440 public:
00441 message_handler_runner(rock* parent, rock_datagram_message* msg, uint32_t msg_type, uint64_t seq, int32_t port, const text& name);
00442 virtual ~message_handler_runner();
00443 void run();
00444 };
00445
00446 class rock_long_message : public referable
00447 {
00448 protected:
00449 rock* parent;
00450 rock_datagram_message* hdr;
00451 uint64_t seq;
00452 size_t size;
00453 size_t num_packets;
00454 unsigned char** packets;
00455 size_t* packet_size;
00456 public:
00457 rock_long_message(rock* parent, rock_datagram_message* hdr, uint64_t seq, size_t size, size_t num_packets);
00458 virtual ~rock_long_message();
00459
00460 void addPacket(size_t pkt_num, size_t this_size, unsigned char* data);
00461 bool gotAllPackets() const;
00462 void dispatch() const;
00463 };
00464
00465 class cached_sequence {
00466 protected:
00467 uint64_t seq;
00468 time_t ts;
00469 public:
00470 cached_sequence();
00471 cached_sequence(uint64_t seq);
00472 cached_sequence(const cached_sequence& ref);
00473 virtual ~cached_sequence();
00474
00475 uint64_t getSeq() const { return seq; }
00476 time_t getTime() const { return ts; }
00477 void touch();
00478
00479 const cached_sequence& operator=(const cached_sequence& ref);
00480 };
00481
00482 class remote_rock : public referable {
00483 protected:
00484 mutable karoo_mutex sem;
00485
00486 rock* parent;
00487 text name;
00488 text type;
00489 text host;
00490 int32_t port;
00491 pid_t pid;
00492 map<text,text> args;
00493 text path;
00494 text file;
00495 time_t dying;
00496 int8_t load_factor;
00497 map<uint64_t,cached_sequence> recent_seq_array;
00498 time_t validated;
00499 time_t executed;
00500 time_t startup_time_allowance;
00501 time_t shutdown_time_allowance;
00502 time_t pinged;
00503 time_t exe_file_mtime;
00504 map<uint64_t,rock_long_message*> long_array;
00505 karoo_mutex waiting_sem;
00506 list<rock_datagram_message*> waiting;
00507
00508 class ping_reply : public rock_datagram_reply
00509 {
00510 private:
00511 remote_rock* parent;
00512 public:
00513 ping_reply(remote_rock* parent, rock_datagram_message* msg)
00514 : rock_datagram_reply(msg) { this->parent = parent; }
00515 virtual ~ping_reply() {}
00516 void run();
00517 };
00518
00519 public:
00520 remote_rock(rock* parent, const text& name);
00521 remote_rock(rock* parent, const remote_rock& ref);
00522 remote_rock(rock* parent, const text& name, const text& type, const text& host, int32_t port, int8_t load_factor, time_t startup_time_allowance = (time_t)15, time_t shutdown_time_allowance = (time_t)15);
00523 virtual ~remote_rock();
00524
00525 bool operator==(const remote_rock& ref) const;
00526
00527 void setArgs(const map<text,text>& args);
00528 const map<text,text>& getArgs() const { return args; }
00529
00530 void setPath(const text& path) { this->path = path; }
00531 void setFile(const text& file) { this->file = file; }
00532 text getPath() const { return path; }
00533 text getFile() const { return file; }
00534
00535 text getName() const { return name; }
00536 text getType() const { return type; }
00537 text getHost() const { return host; }
00538 int32_t getPort() const { return port; }
00539 uint8_t getLoadFactor() const { return load_factor; }
00540
00541 void hello(const text& type, const text& host, int32_t port);
00542 void validate();
00543 void validate(const text& host, int32_t port);
00544 void validate(const text& type, const text& host, int32_t port, uint8_t load_factor);
00545 void invalidate();
00546 time_t getValidated() const;
00547 time_t getExecuted() const;
00548 time_t getStartupTimeAllowance() const;
00549 time_t getShutdownTimeAllowance() const;
00550 bool getExeFileChanged();
00551 time_t getPinged() const;
00552
00553 bool containsSeq(uint64_t seq) const;
00554 void addSeq(uint64_t seq);
00555
00556 void addWaiting(rock_datagram_message* to_send);
00557 void sendWaiting();
00558 void nackWaiting();
00559
00569 rock_long_message* addLongMessagePacket(uint64_t hdr_seq, size_t pkt_num, size_t this_size, unsigned char* data);
00570 void addLongMessageHeader(rock* parent, rock_datagram_message* hdr_msg, uint64_t seq, size_t long_size, size_t num_packets);
00571
00576 void execute();
00577
00586 void kill();
00587
00594 void ping();
00595
00596 time_t inDeathThrows() const;
00597
00603 pid_t getPID() const;
00604 };
00605
00606 class rock_datagram_message_ack : public pebble
00607 {
00608 private:
00609 rock* parent;
00610 uint64_t seq;
00611 rock_datagram_message* msg;
00612 int retry_count;
00613 int retry_delay;
00614
00615 public:
00616 rock_datagram_message_ack(rock* parent, uint64_t seq, rock_datagram_message* msg, int retry_delay);
00617 virtual ~rock_datagram_message_ack();
00618
00619 void run();
00620 };
00621
00622
00626 typedef struct {
00628 text key;
00630 text value;
00631 } argtuple;
00632
00636 class rock
00637 {
00638 private:
00640 argtuple* args;
00642 int args_size;
00643
00645 vector<text> arguments;
00646
00651 text name;
00652
00654 text host;
00656 int port;
00658 int self_port;
00660 int ack_retry;
00662 int max_retry;
00664 size_t message_size;
00666 size_t long_message_burst_size;
00667
00669 synchronised<uint64_t> sequence;
00670
00675 karoo_mutex ack_sem;
00677 map<uint64_t,rock_datagram_message*> ack_array;
00678 friend void rock_datagram_message_ack::run();
00680 map<uint64_t,rock_datagram_message*> long_ack_array;
00681
00683 exepool* general_pool;
00684
00686 datagram_server* server;
00688 datagram_client* client;
00689 friend void rock_datagram_message_handler::sendAck(rock_datagram_message* reply);
00690
00692 exeque* output_queue;
00694 exeque* run_queue;
00696 rock_datagram_factory* factory;
00698 default_rock_datagram_message_handler* handler;
00703 vector<rock_datagram_message_handler*> handlers;
00705 friend void message_handler_runner::run();
00706
00713 virtual rock_datagram_message* createRockByeMessage();
00714
00715 mutable karoo_mutex bye_sem;
00716 bool stopping;
00717
00718 class rock_bye: public pebble
00719 {
00720 private:
00721 rock_datagram_message* bye;
00722 uint64_t bye_seq;
00723 rock* parent;
00724 log logger;
00725 public:
00726 rock_bye(rock* parent, const log& logger);
00727 virtual ~rock_bye();
00728
00729 uint64_t getSequence() { return bye_seq; }
00730
00731 void run();
00732 };
00733
00734 class long_runner : public pebble
00735 {
00736 private:
00737 rock* parent;
00738 karoo_mutex sem;
00739 map<uint64_t,rock_datagram_message*> waiting;
00740 vector<rock_datagram_message*> packets;
00741 size_t long_message_burst_size;
00742 size_t min_size;
00743 rock_datagram_message* msg;
00744 bool nacked;
00745
00746 public:
00747 long_runner(rock* parent, size_t long_message_burst_size, rock_datagram_message* msg);
00748 virtual ~long_runner();
00749 void ack(uint64_t seq);
00750 void nack();
00751 void add(rock_datagram_message* pkt);
00752 void run();
00753 };
00754
00755 class long_hdr_reply : public rock_datagram_reply
00756 {
00757 private:
00758 rock* parent;
00759 long_runner* runner;
00760
00761 public:
00762 long_hdr_reply(rock* parent, long_runner* runner, rock_datagram_message* msg);
00763 virtual ~long_hdr_reply();
00764 void run();
00765 };
00766
00767 class long_runner;
00768
00769 class long_pkt_reply : public rock_datagram_reply
00770 {
00771 private:
00772 rock* parent;
00773 long_runner* runner;
00774
00775 public:
00776 long_pkt_reply(rock* parent, long_runner* runner, rock_datagram_message* msg);
00777 virtual ~long_pkt_reply();
00778 void run();
00779 };
00780
00781 class rock_keep_connected: public pebble
00782 {
00783 private:
00784 time_t poll_secs;
00785 rock* parent;
00786 log logger;
00787
00788 class rock_keep_connected_reply : public rock_datagram_reply
00789 {
00790 private:
00791 rock_keep_connected* parent;
00792 public:
00793 rock_keep_connected_reply(rock_keep_connected* parent, rock_datagram_message* msg)
00794 : rock_datagram_reply(msg) { this->parent = parent; parent->reference(); }
00795 virtual ~rock_keep_connected_reply() { parent->dereference(); }
00796 void run() { parent->acked(); }
00797 };
00798
00799 public:
00800 rock_keep_connected(rock* parent, const log& logger, time_t poll_secs);
00801 virtual ~rock_keep_connected() {}
00802
00803 void acked();
00804 void run();
00805 };
00806
00807
00808 mutable karoo_mutex children_sem;
00809 map<text,remote_rock*> children;
00810 multimap<text,remote_rock*> children_by_type;
00811 protected:
00813 void getArguments(const vector<text>& opts, int argc, char *argv[]);
00820 virtual datagram_status create_client();
00829 virtual datagram_status create_server();
00834 virtual datagram_status init();
00835
00837 log logger;
00838
00846 remote_rock* findRockByType(const text& type);
00847
00859 bool syncChildren(map<text,remote_rock*>& rocks, vector<remote_rock*>& rocks_arr);
00860
00861 public:
00868 rock();
00869
00893 rock(const vector<text>& opts, int argc, char *argv[]);
00894
00901 virtual ~rock();
00902
00903
00908 datagram_status run();
00909
00913 void stop(bool block = true);
00914
00916 void refuse() { output_queue->refuse(); }
00917
00923 void bye();
00924
00929 bool isStopping() const;
00930
00937 void add(pebble* p);
00945 void add(const struct timespec& ts, pebble* p);
00954 void add(int after_seconds, pebble* p);
00955
00963 int getImminentCount(unsigned usecs = 0) const;
00964
00969 int getQueueCount() const;
00970
00974 exepool* getGeneralPool() { return general_pool; }
00975
00981 const text& getArgument(const text& opt) const;
00988 const text& getArgument(const text& opt, const text& def) const;
00995 bool getBoolArgument(const text& opt, bool def) const;
00996
01005 virtual void handle(rock_datagram_message* msg);
01006
01015 void addHandler(rock_datagram_message_handler* h);
01016
01017
01022 virtual text getType() const { return "rock"; }
01023
01024
01025
01026
01027 virtual uint8_t getLoadFactor() const { return 0; }
01028
01030 const text& getHost() const { return host; }
01032 int getPort() const { return port; }
01034 int getSelfPort() const { return self_port; }
01036 text getName() const { return name; }
01038 int getAckRetry() const { return ack_retry; }
01040 int getMaxRetry() const { return max_retry; }
01042 size_t getMessageSize() const { return message_size; }
01044 log getLogger() const { return logger; }
01046 enum log_level getLoggerLevel() const { return (enum log_level)logger; }
01047
01056 virtual rock_datagram_message* createMessage(uint32_t msg_type, text recipient_host = null_string, int recipient_port = 0, size_t message_size = 0);
01062 virtual rock_datagram_message* createRockHelloMessage();
01068 virtual rock_datagram_message* createRockPingMessage();
01069
01077 void startKeepConnectedHello(time_t poll_secs = 5, time_t initial_delay = 0);
01078
01086 void sendMessageByAddress(rock_datagram_message* msg);
01087
01095 void sendMessage(rock_datagram_message* msg);
01096
01106 bool sendMessageByType(rock_datagram_message* msg, const text& type);
01107
01115 void doHello(const text& name, const text& type, const text& host, int port);
01122 void doPing(const text& name, const text& host, int port, bool create = false);
01131 void doPing(const text& name, const text& type, const text& host, int port, uint8_t load_factor);
01136 void doBye(const text& name);
01147 void doLongHeader(const text& name, const text& host, int port, uint32_t long_size, uint32_t num_packets, rock_datagram_message* msg, uint64_t seq);
01159 bool doLongPacket(const text& name, const text& host, int port, uint64_t hdr_seq, uint32_t pkt_num, uint32_t this_size, unsigned char* data);
01160
01168 void notifyAcked(uint64_t seq, rock_datagram_message* msg);
01169
01171 uint64_t getSequence() { return ++sequence; }
01172
01179 bool getRemoteRockType(const text& name, text& type) const;
01180 };
01181 }
01182
01183 #endif