00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #ifndef stream_h
00023 #define stream_h
00024
00025 #ifndef _REENTRANT
00026 #define _REENTRANT
00027 #endif
00028
00029 #include "config.h"
00030 #include <openssl/evp.h>
00031 #include <openssl/ssl.h>
00032 #include <netinet/in.h>
00033 #include "util.h"
00034 #include "queue.h"
00035 #include <vector>
00036 using std::vector;
00037 #include <map>
00038 using std::map;
00039 using std::pair;
00040
00041 namespace karoo {
00042
00043 enum stream_status {
00044 STREAM_OK = 0,
00045 STREAM_UNINITIALISED,
00046 STREAM_PIPE_FAILED,
00047 STREAM_PIPE_GET_FLAGS_FAILED,
00048 STREAM_PIPE_SET_FLAGS_FAILED,
00049 STREAM_PIPE_OK,
00050 STREAM_CREATE_FAILED,
00051 STREAM_SET_OPTIONS_FAILED,
00052 STREAM_BIND_FAILED,
00053 STREAM_LISTEN_FAILED,
00054 STREAM_GET_FLAGS_FAILED,
00055 STREAM_SET_FLAGS_FAILED,
00056 STREAM_SOCKET_FAILED,
00057 STREAM_RESOLVE_NAME_FAILED,
00058 STREAM_CONNECT_FAILED,
00059 STREAM_ACCEPT_FAILED,
00060 STREAM_SSL_FAILED,
00061 };
00062
00063 extern const char* stream_status_to_str(enum stream_status status);
00064
00065 class stream_socket;
00066 class stream_service;
00067
00068 class stream_factory {
00069 protected:
00070 stream_service* parent;
00071 public:
00072 stream_factory(stream_service* parent);
00073 virtual ~stream_factory();
00074
00075 virtual stream_socket* createSocket();
00076 };
00077
00078 class stream_service : public pebble {
00079 private:
00080 mutable karoo_mutex sem;
00081 enum stream_status status;
00082 int fh;
00083 int port;
00084 exeque* run_queue;
00085 stream_factory* factory;
00086 vector<stream_socket*> sockets;
00087 sigset_t origSigSet;
00088 sighandler_t prevSigHandler;
00089 time_t close_linger_time;
00090 time_t close_inactivity_time;
00091
00092 int pfd[2];
00093
00094 mutable karoo_mutex ssl_sem;
00095 bool has_ssl;
00096 SSL_CTX* ctx;
00097 SSL_CTX* client_ctx;
00098
00099 void clearSSL();
00100
00101
00102 protected:
00103 log logger;
00104 public:
00105 stream_service(exeque* run_queue, time_t close_linger_time = 15, time_t close_inactivity_time = 45);
00106 virtual ~stream_service();
00107
00108 void setFactory(stream_factory* factory);
00109
00117 bool initialiseSSL(const text& cert_fname, const text& privatekey_fname, int type);
00118
00124 bool logSSLErrorStatus(stream_socket* sock, const text& context);
00125
00126 SSL_CTX* getSSLContext();
00127 SSL_CTX* getClientSSLContext();
00128
00129 void pushSocket(stream_socket* sock);
00130 void eraseSocket(stream_socket* sock);
00131
00132 enum stream_status startListening(int port);
00133 void stopListening();
00134
00139 void setLogger(const log& logger);
00140 log getLogger();
00141
00142 void run();
00143
00144 int getPort() const;
00145
00146 text describe() const;
00147 };
00148
00149
00150 class stream_socket : public pebble {
00151 protected:
00152 enum stream_status status;
00153 text host;
00154 int port;
00155 struct sockaddr_in remoteAddr;
00156 int fh;
00157 time_t close_pending;
00158 stream_service* parent;
00159 stream_buffer write_buffer;
00160 stream_buffer read_buffer;
00161 size_t max_read_buffer_size;
00162 time_t last_network_io;
00163 SSL* ssl;
00164 mutable bool needs_read;
00165 mutable bool needs_write;
00166 mutable karoo_mutex sem;
00167
00168 void clearSSL();
00169 protected:
00170 friend void stream_service::run();
00171 size_t getWriteBufferSize() const;
00172 size_t getReadBufferSize() const;
00173
00174 ssize_t dumpWriteBuffer(int fh);
00175
00176 ssize_t dumpReadBuffer(int fh);
00177
00178 public:
00179 stream_socket(stream_service* parent, size_t max_read_buffer_size = 1024);
00180 stream_socket(stream_service* parent, const text& host, int port, size_t max_read_buffer_size = 1024);
00181 virtual ~stream_socket();
00182
00183 log getLogger();
00184
00185 void close();
00186 virtual void scheduleClose();
00187 time_t closeScheduled() const;
00188
00189 int initByAccept(int ah);
00190
00191 int getFileHandle() const;
00192 enum stream_status getStatus() const;
00193
00194 ssize_t write(const char* data, size_t sz);
00195 ssize_t write(const text& data);
00196 ssize_t read(char* data, size_t sz);
00197 ssize_t available() const;
00198
00199 int getPort() const;
00200 text getHost() const;
00201 int getRemotePort() const;
00202 text getRemoteHost() const;
00203 bool isIncoming() const;
00204 bool needsRead(bool clear = false) const;
00205 bool needsWrite(bool clear = false) const;
00206
00207 void run() {}
00208
00209 virtual text describe() const;
00210
00211 time_t getLastNetworkIO() const;
00212 };
00213
00214 enum HTTP_parser_state {
00215 HTTP_PARSER_STATE_REQUEST,
00216 HTTP_PARSER_STATE_HEADER_KEY,
00217 HTTP_PARSER_STATE_HEADER_VALUE,
00218 HTTP_PARSER_STATE_BODY,
00219 };
00220
00221 enum HTTP_response_type {
00222 HTTP_OK = 200,
00223 HTTP_CREATED = 201,
00224 HTTP_ACCEPTED = 202,
00225 HTTP_NOT_AUTHORITATIVE = 203,
00226 HTTP_NO_CONTENT = 204,
00227 HTTP_RESET = 205,
00228 HTTP_PARTIAL = 206,
00229
00230 HTTP_MULT_CHOICE = 300,
00231 HTTP_MOVED_PERM = 301,
00232 HTTP_MOVED_TEMP = 302,
00233 HTTP_SEE_OTHER = 303,
00234 HTTP_NOT_MODIFIED = 304,
00235 HTTP_USE_PROXY = 305,
00236
00237 HTTP_BAD_REQUEST = 400,
00238 HTTP_UNAUTHORIZED = 401,
00239 HTTP_PAYMENT_REQUIRED = 402,
00240 HTTP_FORBIDDEN = 403,
00241 HTTP_NOT_FOUND = 404,
00242 HTTP_BAD_METHOD = 405,
00243 HTTP_NOT_ACCEPTABLE = 406,
00244 HTTP_PROXY_AUTH = 407,
00245 HTTP_CLIENT_TIMEOUT = 408,
00246 HTTP_CONFLICT = 409,
00247 HTTP_GONE = 410,
00248 HTTP_LENGTH_REQUIRED = 411,
00249 HTTP_PRECON_FAILED = 412,
00250 HTTP_ENTITY_TOO_LARGE = 413,
00251 HTTP_REQ_TOO_LONG = 414,
00252 HTTP_UNSUPPORTED_TYPE = 415,
00253
00254 HTTP_INTERNAL_ERROR = 500,
00255 HTTP_NOT_IMPLEMENTED = 501,
00256 HTTP_BAD_GATEWAY = 502,
00257 HTTP_UNAVAILABLE = 503,
00258 HTTP_GATEWAY_TIMEOUT = 504,
00259 HTTP_VERSION = 505,
00260 };
00261
00262 enum HTTP_method {
00263 HTTP_METHOD_NOT_SUPPORTED = 0,
00264 HTTP_METHOD_GET,
00265 HTTP_METHOD_POST,
00266 HTTP_METHOD_HEAD,
00267 HTTP_METHOD_PUT,
00268 HTTP_METHOD_DELETE,
00269 HTTP_METHOD_TRACE,
00270 HTTP_METHOD_OPTIONS,
00271 HTTP_METHOD_CONNECT,
00272 };
00273
00274 class HTTP_factory;
00275 class HTTP_handler_factory;
00276 class HTTP_handler;
00277
00278 class HTTP_parsed_data: public pebble {
00279 private:
00280 mutable karoo_mutex sem;
00281 text request;
00282 text http_version;
00283 enum HTTP_method method;
00284 text path;
00285 map<text,text>* request_headers;
00286 stream_buffer* request_body;
00287
00288 enum HTTP_response_type response_type;
00289 stream_buffer response_body;
00290 map<text,text> response_headers;
00291
00292 stream_socket* parent;
00293 HTTP_handler_factory* factory;
00294 HTTP_factory* http_factory;
00295
00296 bool first;
00297 bool timeout_pending;
00298 HTTP_handler* handler;
00299 uint64_t seq;
00300 public:
00305 HTTP_parsed_data(const text& request, map<text,text>* request_headers, char* request_body, size_t request_body_length, HTTP_handler_factory* factory, stream_socket* parent, HTTP_factory* http_factory);
00306 virtual ~HTTP_parsed_data();
00307
00308 void run();
00309 void timedOut();
00310 text formatResponse();
00311
00312 log getLogger();
00313
00314 text getPath() const;
00315 void setPath(const text& path);
00316 text getHttpVersion() const;
00317 void getRequestHeaders(map<text,text>& headers) const;
00318 void getResponseHeaders(map<text,text>& headers) const;
00319 enum HTTP_method getMethod() const;
00320 bool getRequestHeader(const text& key, text& val) const;
00321 bool getResponseHeader(const text& key, text& val) const;
00322 void setResponseHeader(const text& key, const text& val);
00323 void setResponseType(enum HTTP_response_type response_type);
00324 void writeResponseBody(const char* data, size_t sz);
00325 void writeResponseBody(const text& str);
00326 size_t getRequestBodySize() const;
00327 size_t readRequestBody(char* data, size_t sz);
00328 void readRequestParams(vector<pair<text,text> >& params);
00329 ssize_t skipToNextMultipartFormData(const text& boundary);
00330 HTTP_handler* getHandler();
00331 uint64_t getSequence() const { return seq; }
00332 text describe() const;
00333
00334 };
00335
00336 class HTTP_handler: public referable {
00337 protected:
00338 HTTP_parsed_data* data;
00339 public:
00340 HTTP_handler(HTTP_parsed_data* data);
00341 virtual ~HTTP_handler();
00342
00343 virtual bool run() = 0;
00344
00345 virtual void timedOut() {}
00346 };
00347
00348 class HTTP_handler_factory: public referable {
00349 protected:
00350 public:
00351 HTTP_handler_factory() {}
00352 virtual ~HTTP_handler_factory() {}
00353
00354 virtual HTTP_handler* create_HTTP_handler(HTTP_parsed_data* data) = 0;
00355 };
00356
00357 class HTTP_parser: public referable {
00358 private:
00359 mutable karoo_mutex sem;
00360 stream_buffer buffer;
00361 exeque* run_queue;
00362 enum HTTP_parser_state state;
00363 text request;
00364 map<text,text>* headers;
00365 text key;
00366 size_t content_length;
00367 stream_socket* parent;
00368 HTTP_handler_factory* factory;
00369 HTTP_factory* http_factory;
00370 public:
00371 HTTP_parser(exeque* run_queue, HTTP_handler_factory* factory, HTTP_factory* http_factory);
00372 virtual ~HTTP_parser();
00373
00374 bool write(const char* data, size_t sz);
00375
00376 void setSocket(stream_socket* parent);
00377 virtual text describe() const;
00378 };
00379
00380 class HTTP_factory: public stream_factory {
00381 private:
00382 HTTP_handler_factory* handler_factory;
00383 exeque* run_queue;
00384 time_t http_timeout;
00385 khashe http_datas;
00386 mutable uint64_t http_seq;
00387 mutable karoo_mutex http_seq_sem;
00388 public:
00389 HTTP_factory(exeque* run_queue, stream_service* parent, HTTP_handler_factory* handler_factory, time_t http_timeout);
00390 virtual ~HTTP_factory();
00391
00392 stream_socket* createSocket();
00393 exeque* getQueue() { return run_queue; }
00394 time_t getHTTPTimeout() const { return http_timeout; }
00395 void pushHTTPData(int64_t seq, HTTP_parsed_data* http_data);
00396 void eraseHTTPData(int64_t seq);
00397 bool getHTTPData(int64_t seq, HTTP_parsed_data*& http_data);
00398 uint64_t getHTTPSequence() const;
00399 };
00400
00401 class HTTP_socket: public stream_socket
00402 {
00403 HTTP_parser* parser;
00404 public:
00405 HTTP_socket(stream_service* parent, HTTP_parser* parser);
00406 HTTP_socket(stream_service* parent, HTTP_parser* parser, const text& host, int port);
00407 virtual ~HTTP_socket();
00408
00409 virtual text describe() const;
00410
00411 void run();
00412 };
00413
00414 class HTTP_timeout: public pebble {
00415 private:
00416 HTTP_factory* parent;
00417 uint64_t seq;
00418 public:
00419 HTTP_timeout(HTTP_factory* parent, uint64_t seq)
00420 {
00421 this->parent = parent;
00422 this->seq = seq;
00423 dereferenceAfterRun();
00424 }
00425 virtual ~HTTP_timeout()
00426 {
00427 }
00428
00429 virtual void run()
00430 {
00431 HTTP_parsed_data* h;
00432 if (parent->getHTTPData(seq, h)) {
00433 h->timedOut();
00434 }
00435 }
00436 };
00437
00438 }
00439
00440 #endif