@@ -33,264 +33,72 @@ namespace details
3333
3434comet_manager::comet_manager ()
3535{
36- pthread_rwlock_init (&comet_guard, NULL );
37- pthread_mutex_init (&cleanmux, NULL );
38- pthread_cond_init (&cleancond, NULL );
3936}
4037
4138comet_manager::~comet_manager ()
4239{
43- pthread_rwlock_destroy (&comet_guard);
44- pthread_mutex_destroy (&cleanmux);
45- pthread_cond_destroy (&cleancond);
4640}
4741
48- void comet_manager::send_message_to_topic (
49- const string& topic,
50- const string& message,
51- const httpserver::http::http_utils::start_method_T& start_method
52- )
42+ void comet_manager::send_message_to_topic (const string& topic, const string& message)
5343{
54- pthread_rwlock_wrlock (&comet_guard);
55- for (set<http::httpserver_ska>::const_iterator it = q_waitings[topic].begin ();
56- it != q_waitings[topic].end ();
57- ++it
58- )
59- {
60- q_messages[(*it)].push_back (message);
61- q_signal.insert ((*it));
62- if (start_method != http::http_utils::INTERNAL_SELECT)
63- {
64- pthread_mutex_lock (&q_blocks[(*it)].first );
65- pthread_cond_signal (&q_blocks[(*it)].second );
66- pthread_mutex_unlock (&q_blocks[(*it)].first );
67- }
68- map<http::httpserver_ska, long >::const_iterator itt;
69- if ((itt = q_keepalives.find (*it)) != q_keepalives.end ())
70- {
71- struct timeval curtime;
72- gettimeofday (&curtime, NULL );
73- q_keepalives[*it] = curtime.tv_sec ;
74- }
75- }
76- pthread_rwlock_unlock (&comet_guard);
77- if (start_method != http::http_utils::INTERNAL_SELECT)
78- {
79- pthread_mutex_lock (&cleanmux);
80- pthread_cond_signal (&cleancond);
81- pthread_mutex_unlock (&cleanmux);
82- }
83- }
44+ map<string, set<MHD_Connection*> >::const_iterator it = this ->q_topics .find (topic);
45+ if (it == this ->q_topics .end ()) return ;
8446
85- void comet_manager::register_to_topics (
86- const vector<string>& topics,
87- const http::httpserver_ska& connection_id,
88- int keepalive_secs,
89- string keepalive_msg,
90- const httpserver::http::http_utils::start_method_T& start_method
91- )
92- {
93- pthread_rwlock_wrlock (&comet_guard);
94- for (vector<string>::const_iterator it = topics.begin ();
95- it != topics.end (); ++it
96- )
97- q_waitings[*it].insert (connection_id);
98- if (keepalive_secs != -1 )
99- {
100- struct timeval curtime;
101- gettimeofday (&curtime, NULL );
102- q_keepalives[connection_id] = curtime.tv_sec ;
103- q_keepalives_mem[connection_id] = make_pair (
104- keepalive_secs, keepalive_msg
105- );
106- }
107- if (start_method != http::http_utils::INTERNAL_SELECT)
47+ // copying value guarantees we iterate on a copy. Even if the original set is modified we are safe and so we stay lock free.
48+ const set<MHD_Connection*> connections = it->second ;
49+
50+ for (set<MHD_Connection*>::const_iterator c_it = connections.begin (); c_it != connections.end (); ++c_it)
10851 {
109- pthread_mutex_t m;
110- pthread_cond_t c;
111- pthread_mutex_init (&m, NULL );
112- pthread_cond_init (&c, NULL );
113- q_blocks[connection_id] =
114- make_pair (m, c);
115- }
116- pthread_rwlock_unlock (&comet_guard);
117- }
52+ map<MHD_Connection*, deque<string> >::iterator message_queue_it = this ->q_messages .find (*c_it);
53+ if (message_queue_it == this ->q_messages .end ()) continue ;
11854
119- size_t comet_manager::read_message (const http::httpserver_ska& connection_id,
120- string& message
121- )
122- {
123- pthread_rwlock_wrlock (&comet_guard);
124- deque<string>& t_deq = q_messages[connection_id];
125- message.assign (t_deq.front ());
126- t_deq.pop_front ();
127- pthread_rwlock_unlock (&comet_guard);
128- return message.size ();
55+ message_queue_it->second .push_back (message);
56+
57+ MHD_resume_connection (*c_it);
58+ }
12959}
13060
131- size_t comet_manager::get_topic_consumers (
132- const string& topic,
133- set<http::httpserver_ska>& consumers
134- )
61+ void comet_manager::register_to_topics (const vector<string>& topics, MHD_Connection* connection_id)
13562{
136- pthread_rwlock_rdlock (&comet_guard);
137-
138- for (set<http::httpserver_ska>::const_iterator it = q_waitings[topic].begin ();
139- it != q_waitings[topic].end (); ++it
140- )
63+ for (vector<string>::const_iterator it = topics.begin (); it != topics.end (); ++it)
14164 {
142- consumers .insert ((*it));
65+ this -> q_topics [*it] .insert (connection_id); // (1) Can this cause problems in concurrency with (2) ?
14366 }
144- std::set<httpserver::http::httpserver_ska>::size_type size = consumers.size ();
145- pthread_rwlock_unlock (&comet_guard);
146- return size;
67+ this ->q_subscriptions .insert (make_pair (connection_id, set<string>(topics.begin (), topics.end ())));
68+ this ->q_messages .insert (make_pair (connection_id, deque<string>()));
14769}
14870
149- bool comet_manager::pop_signaled (const http::httpserver_ska& consumer,
150- const httpserver::http::http_utils::start_method_T& start_method
151- )
71+ size_t comet_manager::read_message (MHD_Connection* connection_id, string& message)
15272{
153- if (start_method == http::http_utils::INTERNAL_SELECT )
73+ if (this -> q_messages [connection_id]. empty () )
15474 {
155- pthread_rwlock_wrlock (&comet_guard);
156- set<http::httpserver_ska>::iterator it = q_signal.find (consumer);
157- if (it != q_signal.end ())
158- {
159- if (q_messages[consumer].empty ())
160- {
161- q_signal.erase (it);
162- pthread_rwlock_unlock (&comet_guard);
163- return false ;
164- }
165- pthread_rwlock_unlock (&comet_guard);
166- return true ;
167- }
168- else
169- {
170- pthread_rwlock_unlock (&comet_guard);
171- return false ;
172- }
75+ MHD_suspend_connection (connection_id);
76+ return 0 ;
17377 }
174- else
175- {
176- pthread_rwlock_rdlock (&comet_guard);
177- pthread_mutex_lock (&q_blocks[consumer].first );
178- struct timespec t;
179- struct timeval curtime;
180-
181- {
182- bool to_unlock = true ;
183- while (q_signal.find (consumer) == q_signal.end ())
184- {
185- if (to_unlock)
186- {
187- pthread_rwlock_unlock (&comet_guard);
188- to_unlock = false ;
189- }
190- gettimeofday (&curtime, NULL );
191- t.tv_sec = curtime.tv_sec + q_keepalives_mem[consumer].first ;
192- t.tv_nsec = 0 ;
193- int rslt = pthread_cond_timedwait (&q_blocks[consumer].second ,
194- &q_blocks[consumer].first , &t
195- );
196- if (rslt == ETIMEDOUT)
197- {
198- pthread_rwlock_wrlock (&comet_guard);
199- send_message_to_consumer (consumer,
200- q_keepalives_mem[consumer].second , false , start_method
201- );
202- pthread_rwlock_unlock (&comet_guard);
203- }
204- }
205- if (to_unlock)
206- pthread_rwlock_unlock (&comet_guard);
207- }
20878
209- if (q_messages[consumer].size () == 0 )
210- {
211- pthread_rwlock_wrlock (&comet_guard);
212- q_signal.erase (consumer);
213- pthread_mutex_unlock (&q_blocks[consumer].first );
214- pthread_rwlock_unlock (&comet_guard);
215- return false ;
216- }
217- pthread_rwlock_rdlock (&comet_guard);
218- pthread_mutex_unlock (&q_blocks[consumer].first );
219- pthread_rwlock_unlock (&comet_guard);
220- return true ;
221- }
222- return false ;
79+ deque<string>& t_deq = this ->q_messages [connection_id];
80+ message.assign (t_deq.front ());
81+ t_deq.pop_front ();
82+ return message.size ();
22383}
22484
225- void comet_manager::complete_request (const http::httpserver_ska& connection_id)
85+ void comet_manager::complete_request (MHD_Connection* connection_id)
22686{
227- pthread_rwlock_wrlock (&comet_guard);
228- q_messages.erase (connection_id);
229- q_blocks.erase (connection_id);
230- q_signal.erase (connection_id);
231- q_keepalives.erase (connection_id);
87+ this ->q_messages .erase (connection_id);
23288
233- typedef map<string, set<http::httpserver_ska> >::iterator conn_it;
234- for (conn_it it = q_waitings.begin (); it != q_waitings.end (); ++it)
235- {
236- it->second .erase (connection_id);
237- }
238- pthread_rwlock_unlock (&comet_guard);
239- }
89+ map<MHD_Connection*, set<string> >::iterator topics_it = this ->q_subscriptions .find (connection_id);
90+ if (topics_it == q_subscriptions.end ()) return ;
91+ set<string> topics = topics_it->second ;
24092
241- void comet_manager::comet_select (unsigned long long * timeout_secs,
242- unsigned long long * timeout_microsecs,
243- const httpserver::http::http_utils::start_method_T& start_method
244- )
245- {
246- pthread_rwlock_wrlock (&comet_guard);
247- for (map<http::httpserver_ska, long >::iterator it = q_keepalives.begin (); it != q_keepalives.end (); ++it)
93+ for (set<string>::const_iterator it = topics.begin (); it != topics.end (); ++it)
24894 {
249- struct timeval curtime;
250- gettimeofday (&curtime, NULL );
251- int waited_time = curtime.tv_sec - (*it).second ;
252- if (waited_time >= q_keepalives_mem[(*it).first ].first )
253- {
254- send_message_to_consumer ((*it).first , q_keepalives_mem[(*it).first ].second , true , start_method);
255- }
256- else
257- {
258- unsigned long long to_wait_time = (q_keepalives_mem[(*it).first ].first - waited_time);
259- if (to_wait_time < *timeout_secs)
260- {
261- *timeout_secs = to_wait_time;
262- *timeout_microsecs = 0 ;
263- }
264- }
265- }
266- pthread_rwlock_unlock (&comet_guard);
267- }
95+ map<string, set<MHD_Connection*> >::iterator connections_it = this ->q_topics .find (*it);
96+ if (connections_it == this ->q_topics .end ()) continue ;
26897
269- void comet_manager::send_message_to_consumer (
270- const http::httpserver_ska& connection_id,
271- const std::string& message,
272- bool to_lock,
273- const httpserver::http::http_utils::start_method_T& start_method
274- )
275- {
276- // This function need to be externally locked on write
277- q_messages[connection_id].push_back (message);
278- map<http::httpserver_ska, long >::const_iterator it;
279- if ((it = q_keepalives.find (connection_id)) != q_keepalives.end ())
280- {
281- struct timeval curtime;
282- gettimeofday (&curtime, NULL );
283- q_keepalives[connection_id] = curtime.tv_sec ;
284- }
285- q_signal.insert (connection_id);
286- if (start_method != http::http_utils::INTERNAL_SELECT)
287- {
288- if (to_lock)
289- pthread_mutex_lock (&q_blocks[connection_id].first );
290- pthread_cond_signal (&q_blocks[connection_id].second );
291- if (to_lock)
292- pthread_mutex_unlock (&q_blocks[connection_id].first );
98+ connections_it->second .erase (connection_id);
99+ if (connections_it->second .size () == 0 ) this ->q_topics .erase (*it); // (2)
293100 }
101+ q_subscriptions.erase (connection_id);
294102}
295103
296104} // details
0 commit comments