40 #include "TaskContextServer.hpp"
41 #include "TaskContextProxy.hpp"
42 #include "corba.h"
43 #ifdef CORBA_IS_TAO
44 #include "TaskContextS.h"
45 #include <orbsvcs/CosNamingC.h>
46 // ACE Specific, for printing exceptions.
47 #include <ace/SString.h>
48 #include "tao/TimeBaseC.h"
49 #include "tao/Messaging/Messaging.h"
50 #include "tao/Messaging/Messaging_RT_PolicyC.h"
51 #else
52 #include <omniORB4/Naming.hh>
53 #endif
54 #include "TaskContextC.h"
55 #include "TaskContextI.h"
56 #include "DataFlowI.h"
57 #include "POAUtility.h"
58 #include <iostream>
59 #include <fstream>
61 #include "../../os/threads.hpp"
62 #include "../../Activity.hpp"
64 namespace RTT
65 {namespace corba
66 {
67  using namespace std;
69  std::map<TaskContext*, TaskContextServer*> TaskContextServer::servers;
71  base::ActivityInterface* TaskContextServer::orbrunner = 0;
73  bool TaskContextServer::is_shutdown = false;
75  std::map<TaskContext*, std::string> TaskContextServer::iors;
78  {
79  Logger::In in("~TaskContextServer()");
80  servers.erase(mtaskcontext);
82  // Remove taskcontext ior reference
83  iors.erase(mtaskcontext);
85  PortableServer::ObjectId_var oid = mpoa->servant_to_id(mtask_i.in());
86  mpoa->deactivate_object(oid);
88  if (muse_naming) {
89  try {
90  CORBA::Object_var rootObj = orb->resolve_initial_references("NameService");
91  CosNaming::NamingContext_var rootNC = CosNaming::NamingContext::_narrow(rootObj.in());
93  if (CORBA::is_nil( rootNC.in() ) ) {
94  log(Warning) << "CTaskContext '"<< mregistered_name << "' could not find CORBA Naming Service."<<endlog();
95  } else {
96  // Nameserver found...
97  CosNaming::Name name;
98  name.length(2);
99  name[0].id = CORBA::string_dup("TaskContexts");
100  name[1].id = CORBA::string_dup( mregistered_name.c_str() );
101  try {
102  rootNC->unbind(name);
103  log(Info) << "Successfully removed CTaskContext '"<< mregistered_name <<"' from CORBA Naming Service."<<endlog();
104  }
105  catch( CosNaming::NamingContext::NotFound ) {
106  log(Info) << "CTaskContext '"<< mregistered_name << "' task was already unbound."<<endlog();
107  }
108  catch( ... ) {
109  log(Warning) << "CTaskContext '"<< mregistered_name << "' unbinding failed."<<endlog();
110  }
111  }
112  } catch (...) {
113  log(Warning) << "CTaskContext '"<< mregistered_name << "' unbinding failed from CORBA Naming Service."<<endlog();
114  }
115  }
116  }
119  void TaskContextServer::initTaskContextServer(bool require_name_service)
120  {
121  Logger::In in("TaskContextServer()");
122  servers[mtaskcontext] = this;
123  try {
124  // Each server has its own POA.
125  // The server's objects have their own poa as well.
126  CORBA::Object_var poa_object =
127  orb->resolve_initial_references ("RootPOA");
128  mpoa = PortableServer::POA::_narrow(poa_object);
129  PortableServer::POAManager_var poa_manager =
130  mpoa->the_POAManager ();
132  //poa = POAUtility::create_basic_POA( poa, poa_manager, taskc->getName().c_str(), 0, 1);
133  // poa_manager->activate ();
135  // TODO : Use a better suited POA than create_basic_POA, use the 'session' or so type
136  // But watch out: we need implicit activation, our you will get exceptions upon ->_this()
137  // The POA for the Server's objects:
138  // PortableServer::POA_var objpoa = POAUtility::create_basic_POA(poa,
139  // poa_manager,
140  // std::string(taskc->getName() + "OBJPOA").c_str(),
141  // 0, 0); // Not persistent, allow implicit.
143  // The servant : TODO : cleanup servant in destructor !
145  mtask_i = serv = new RTT_corba_CTaskContext_i( mtaskcontext, mpoa );
146  mtask = serv->activate_this();
148  // Store reference to iors
149  CORBA::String_var ior = orb->object_to_string( mtask.in() );
150  iors[mtaskcontext] = std::string( ior.in() );
152  if ( muse_naming ) {
153  CORBA::Object_var rootObj;
154  CosNaming::NamingContext_var rootNC;
155  try {
156  rootObj = orb->resolve_initial_references("NameService");
157  rootNC = CosNaming::NamingContext::_narrow(rootObj);
158  } catch (...) {}
160  if (CORBA::is_nil( rootNC ) ) {
161  std::string err("CTaskContext '" + mregistered_name + "' could not find CORBA Naming Service.");
162  if (require_name_service) {
163  log(Error) << err << endlog();
164  servers.erase(mtaskcontext);
165  throw IllegalServer(err);
166  }
167  else
168  {
169  log(Warning) << err << endlog();
171  log() <<"Writing IOR to 'std::cerr' and file '" << mregistered_name <<".ior'"<<endlog();
173  // this part only publishes the IOR to a file.
174  CORBA::String_var ior = orb->object_to_string( mtask.in() );
175  std::cerr << ior.in() <<std::endl;
176  {
177  // write to a file as well.
178  std::string iorname( mregistered_name );
179  iorname += ".ior";
180  std::ofstream file_ior( iorname.c_str() );
181  file_ior << ior.in() <<std::endl;
182  }
183 #endif
184  return;
185  }
186  }
187  log(Info) << "CTaskContext '"<< mregistered_name << "' found CORBA Naming Service."<<endlog();
188  // Nameserver found...
189  CosNaming::Name name;
190  name.length(1);
191  name[0].id = CORBA::string_dup("TaskContexts");
192  CosNaming::NamingContext_var controlNC;
193  try {
194  controlNC = rootNC->bind_new_context(name);
195  }
196  catch( CosNaming::NamingContext::AlreadyBound&) {
197  log(Debug) << "NamingContext 'TaskContexts' already bound to CORBA Naming Service."<<endlog();
198  // NOP.
199  }
201  name.length(2);
202  name[1].id = CORBA::string_dup( mregistered_name.c_str() );
203  try {
204  rootNC->bind(name, mtask );
205  log(Info) << "Successfully added CTaskContext '"<< mregistered_name <<"' to CORBA Naming Service."<<endlog();
206  }
207  catch( CosNaming::NamingContext::AlreadyBound&) {
208  log(Warning) << "CTaskContext '"<< mregistered_name << "' already bound to CORBA Naming Service."<<endlog();
209  log() <<"Trying to rebind...";
210  try {
211  rootNC->rebind(name, mtask);
212  } catch( ... ) {
213  log() << " failed!"<<endlog();
214  return;
215  }
216  log() << " done. New CTaskContext bound to Naming Service."<<endlog();
217  }
218  } // use_naming
219  else {
220  log(Info) <<"CTaskContext '"<< mregistered_name << "' is not using the CORBA Naming Service."<<endlog();
222  log() <<"Writing IOR to 'std::cerr' and file '" << mregistered_name <<".ior'"<<endlog();
224  // this part only publishes the IOR to a file.
225  CORBA::String_var ior = orb->object_to_string( mtask.in() );
226  std::cerr << ior.in() <<std::endl;
227  {
228  // write to a file as well.
229  std::string iorname( mregistered_name );
230  iorname += ".ior";
231  std::ofstream file_ior( iorname.c_str() );
232  file_ior << ior.in() <<std::endl;
233  }
234 #endif
235  return;
236  }
237  }
238  catch (CORBA::Exception &e) {
239  log(Error) << "CORBA exception raised!" << endlog();
240  log() << CORBA_EXCEPTION_INFO(e) << endlog();
241  }
243  }
245  TaskContextServer::TaskContextServer(TaskContext* taskc, const string& alias, bool use_naming, bool require_name_service)
246  : mtaskcontext(taskc), muse_naming(use_naming), mregistered_name(alias)
247  {
248  this->initTaskContextServer(require_name_service);
249  }
252  TaskContextServer::TaskContextServer(TaskContext* taskc, bool use_naming, bool require_name_service)
253  : mtaskcontext(taskc), muse_naming(use_naming), mregistered_name(taskc->getName())
254  {
255  this->initTaskContextServer(require_name_service);
256  }
259  if ( !CORBA::is_nil(orb) && !is_shutdown) {
260  log(Info) << "Cleaning up TaskContextServers..."<<endlog();
261  while ( !servers.empty() ){
262  delete servers.begin()->second;
263  // note: destructor will self-erase from map !
264  }
266  log() << "Cleanup done."<<endlog();
267  }
268  }
271  if ( !CORBA::is_nil(orb) ) {
272  ServerMap::iterator it = servers.find(c);
273  if ( it != servers.end() ){
274  log(Info) << "Cleaning up TaskContextServer for "<< c->getName()<<endlog();
276  delete it->second; // destructor will do the rest.
277  // note: destructor will self-erase from map !
278  }
279  }
280  }
282  void TaskContextServer::ShutdownOrb(bool wait_for_completion)
283  {
284  Logger::In in("ShutdownOrb");
285  DoShutdownOrb(wait_for_completion);
286  }
288  void TaskContextServer::DoShutdownOrb(bool wait_for_completion)
289  {
290  if (is_shutdown) {
291  log(Info) << "Orb already down..."<<endlog();
292  return;
293  }
294  if ( CORBA::is_nil(orb) ) {
295  log(Error) << "Orb Shutdown...failed! Orb is nil." << endlog();
296  return;
297  }
299  try {
300  CleanupServers(); // can't do this after an orb->shutdown().
301  log(Info) << "Orb Shutdown...";
302  is_shutdown = true;
303  if (wait_for_completion)
304  log(Info)<<"waiting..."<<endlog();
305  orb->shutdown( wait_for_completion );
306  log(Info) << "done." << endlog();
307  }
308  catch (CORBA::Exception &e) {
309  log(Error) << "Orb Shutdown...failed! CORBA exception raised." << endlog();
310  log() << CORBA_EXCEPTION_INFO(e) << endlog();
311  return;
312  }
313  }
317  {
318  if ( CORBA::is_nil(orb) ) {
319  log(Error) << "RunOrb...failed! Orb is nil." << endlog();
320  return;
321  }
322  try {
323  log(Info) <<"Entering orb->run()."<<endlog();
324  orb->run();
325  log(Info) <<"Breaking out of orb->run()."<<endlog();
326  }
327  catch (CORBA::Exception &e) {
328  log(Error) << "Orb Run : CORBA exception raised!" << endlog();
329  log() << CORBA_EXCEPTION_INFO(e) << endlog();
330  }
331  }
336  class OrbRunner
337  : public Activity
338  {
339  public:
340  OrbRunner(int scheduler, int priority, unsigned cpu_affinity)
341  : Activity(scheduler, priority, cpu_affinity)
342  {}
343  void loop()
344  {
345  Logger::In in("OrbRunner");
347  }
349  bool breakLoop()
350  {
351  return true;
352  }
354  void finalize()
355  {
356  Logger::In in("OrbRunner");
357  log(Info) <<"Safely stopped."<<endlog();
358  }
359  };
362  void TaskContextServer::ThreadOrb(int scheduler, int priority, unsigned cpu_affinity)
363  {
364  Logger::In in("ThreadOrb");
365  if ( CORBA::is_nil(orb) ) {
366  log(Error) << "ThreadOrb...failed! Orb is nil." << endlog();
367  return;
368  }
369  if (orbrunner != 0) {
370  log(Error) <<"Orb already running in a thread."<<endlog();
371  } else {
372  log(Info) <<"Starting Orb in a thread."<<endlog();
373  orbrunner = new OrbRunner(scheduler, priority, cpu_affinity);
374  orbrunner->start();
375  }
376  }
379  {
380  Logger::In in("DestroyOrb");
381  if ( CORBA::is_nil(orb) ) {
382  log(Error) << "DestroyOrb...failed! Orb is nil." << endlog();
383  return;
384  }
386  if (orbrunner) {
387  orbrunner->stop();
388  delete orbrunner;
389  orbrunner = 0;
390  }
392  try {
393  // Destroy the POA, waiting until the destruction terminates
394  //poa->destroy (1, 1);
395  CleanupServers();
396  orb->destroy();
397  rootPOA = 0;
398  orb = 0;
399  log(Info) <<"Orb destroyed."<<endlog();
400  }
401  catch (CORBA::Exception &e) {
402  log(Error) << "Orb Destroy : CORBA exception raised!" << endlog();
403  log() << CORBA_EXCEPTION_INFO(e) << endlog();
404  }
406  }
408  TaskContextServer* TaskContextServer::Create(TaskContext* tc, bool use_naming, bool require_name_service){
409  return TaskContextServer::Create(tc, tc->getName(), use_naming, require_name_service);
410  }
412  TaskContextServer* TaskContextServer::Create(TaskContext* tc, const std::string& alias, bool use_naming, bool require_name_service) {
413  if ( CORBA::is_nil(orb) )
414  return 0;
416  if ( servers.count(tc) ) {
417  log(Debug) << "Returning existing TaskContextServer for "<< alias <<endlog();
418  return servers.find(tc)->second;
419  }
421  // create new:
422  log(Info) << "Creating new TaskContextServer for "<< alias <<endlog();
423  try {
424  TaskContextServer* cts = new TaskContextServer(tc, alias, use_naming, require_name_service);
425  return cts;
426  }
427  catch( IllegalServer& is ) {
428  cerr << is.what() << endl;
429  }
430  return 0;
431  }
433  CTaskContext_ptr TaskContextServer::CreateServer(TaskContext* tc, bool use_naming, bool require_name_service) {
434  return TaskContextServer::CreateServer(tc, tc->getName(), use_naming, require_name_service);
435  }
437  CTaskContext_ptr TaskContextServer::CreateServer(TaskContext* tc, const std::string& alias, bool use_naming, bool require_name_service) {
438  if ( CORBA::is_nil(orb) )
439  return CTaskContext::_nil();
441  if ( servers.count(tc) ) {
442  log(Debug) << "Returning existing TaskContextServer for "<< alias <<endlog();
443  return CTaskContext::_duplicate( servers.find(tc)->second->server() );
444  }
446  for (TaskContextProxy::PMap::iterator it = TaskContextProxy::proxies.begin(); it != TaskContextProxy::proxies.end(); ++it)
447  if ( (it->first) == tc ) {
448  log(Debug) << "Returning server of Proxy for "<< alias <<endlog();
449  return CTaskContext::_duplicate(it->second);
450  }
452  // create new:
453  log(Info) << "Creating new TaskContextServer for "<< alias <<endlog();
454  try {
455  TaskContextServer* cts = new TaskContextServer(tc, alias, use_naming, require_name_service);
456  return CTaskContext::_duplicate( cts->server() );
457  }
458  catch( IllegalServer& is ) {
459  cerr << is.what() << endl;
460  }
461  return CTaskContext::_nil();
462  }
465  CTaskContext_ptr TaskContextServer::server() const
466  {
467  // we're not a factory function, so we don't _duplicate.
468  return mtask.in();
469  }
472  {
473  IorMap::const_iterator it = iors.find(tc);
474  if (it != iors.end())
475  return it->second;
477  return std::string("");
478  }
480 }}
