Coverage for /Syzygy/trace/service/service.cc

CoverageLines executed / instrumented / missingexe / inst / missLanguageGroup
81.2%2773410.C++source

Line-by-line coverage:

   1    :  // Copyright 2012 Google Inc. All Rights Reserved.
   2    :  //
   3    :  // Licensed under the Apache License, Version 2.0 (the "License");
   4    :  // you may not use this file except in compliance with the License.
   5    :  // You may obtain a copy of the License at
   6    :  //
   7    :  //     http://www.apache.org/licenses/LICENSE-2.0
   8    :  //
   9    :  // Unless required by applicable law or agreed to in writing, software
  10    :  // distributed under the License is distributed on an "AS IS" BASIS,
  11    :  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12    :  // See the License for the specific language governing permissions and
  13    :  // limitations under the License.
  14    :  //
  15    :  // This file defines the trace::service::Service class which
  16    :  // implements the call trace service RPC interface.
  17    :  //
  18    :  // TODO(rogerm): Use server controlled context handles to refer to the buffers
  19    :  //     across the RPC boundary. The shared memory handle is client controlled
  20    :  //     and not necessarily unique.
  21    :  
  22    :  #include "syzygy/trace/service/service.h"
  23    :  
  24    :  #include "base/bind.h"
  25    :  #include "base/callback.h"
  26    :  #include "base/string_util.h"
  27    :  #include "base/memory/scoped_ptr.h"
  28    :  #include "sawbuck/common/com_utils.h"
  29    :  #include "syzygy/common/align.h"
  30    :  #include "syzygy/trace/protocol/call_trace_defs.h"
  31    :  #include "syzygy/trace/service/buffer_consumer.h"
  32    :  #include "syzygy/trace/service/session.h"
  33    :  
  34    :  namespace trace {
  35    :  namespace service {
  36    :  
  37    :  const size_t Service::kDefaultBufferSize = 2 * 1024 * 1024;
  38    :  const size_t Service::kDefaultNumIncrementalBuffers = 16;
  39    :  
  40    :  // The choice of this value is not particularly important, but it should be
  41    :  // something that is relatively prime to the number of buffers created per
  42    :  // allocation, and it should represent more memory than our disk bandwidth
  43    :  // can reasonably write in about a second or so, so as to allow sufficient
  44    :  // buffering for smoothing. Assuming 20MB/sec consistent throughput, this
  45    :  // represents about 26 MB, so 1.3 seconds of disk bandwidth.
  46    :  const size_t Service::kDefaultMaxBuffersPendingWrite = 13;
  47    :  
  48    :  Service::Service(BufferConsumerFactory* factory)
  49    :      : num_active_sessions_(0),
  50    :        num_incremental_buffers_(kDefaultNumIncrementalBuffers),
  51    :        buffer_size_in_bytes_(kDefaultBufferSize),
  52    :        max_buffers_pending_write_(kDefaultMaxBuffersPendingWrite),
  53    :        owner_thread_(base::PlatformThread::CurrentId()),
  54    :        buffer_consumer_factory_(factory),
  55    :        a_session_has_closed_(&lock_),
  56    :        rpc_is_initialized_(false),
  57    :        rpc_is_running_(false),
  58    :        rpc_is_non_blocking_(false),
  59  E :        flags_(TRACE_FLAG_BATCH_ENTER) {
  60  E :    DCHECK(factory != NULL);
  61  E :  }
  62    :  
  63  E :  Service::~Service() {
  64  E :    DCHECK_EQ(owner_thread_, base::PlatformThread::CurrentId());
  65  E :    DCHECK(buffer_consumer_factory_ != NULL);
  66    :  
  67  E :    Stop();
  68    :  
  69  E :    DCHECK(sessions_.empty());
  70  E :    DCHECK_EQ(0U, num_active_sessions_);
  71  E :  }
  72    :  
  73  E :  void Service::AddOneActiveSession() {
  74  E :    base::AutoLock auto_lock(lock_);
  75    :  
  76  E :    ++num_active_sessions_;
  77  E :  }
  78    :  
  79  E :  void Service::RemoveOneActiveSession() {
  80    :    {
  81  E :      base::AutoLock auto_lock(lock_);
  82  E :      DCHECK_LT(0u, num_active_sessions_);
  83    :  
  84  E :      --num_active_sessions_;
  85  E :    }
  86    :  
  87  E :    a_session_has_closed_.Signal();
  88  E :  }
  89    :  
  90  E :  bool Service::OpenServiceEvent() {
  91  E :    DCHECK_EQ(owner_thread_, base::PlatformThread::CurrentId());
  92  E :    DCHECK(!service_event_.IsValid());
  93    :  
  94  E :    std::wstring event_name;
  95  E :    ::GetSyzygyCallTraceRpcEventName(instance_id_, &event_name);
  96    :  
  97  E :    service_event_.Set(::CreateEvent(NULL, TRUE, FALSE, event_name.c_str()));
  98  E :    if (!service_event_.IsValid()) {
  99  E :      DWORD error = ::GetLastError();
 100  E :      LOG(ERROR) << "Failed to create event: " << com::LogWe(error) << ".";
 101  E :      return false;
 102    :    }
 103    :  
 104  E :    return true;
 105  E :  }
 106    :  
 107  E :  bool Service::AcquireServiceMutex() {
 108  E :    DCHECK_EQ(owner_thread_, base::PlatformThread::CurrentId());
 109  E :    DCHECK(!service_mutex_.IsValid());
 110    :  
 111  E :    std::wstring mutex_name;
 112  E :    ::GetSyzygyCallTraceRpcMutexName(instance_id_, &mutex_name);
 113  E :    base::win::ScopedHandle mutex(::CreateMutex(NULL, FALSE, mutex_name.c_str()));
 114  E :    if (!mutex.IsValid()) {
 115  i :      DWORD error = ::GetLastError();
 116  i :      LOG(ERROR) << "Failed to create mutex: " << com::LogWe(error) << ".";
 117  i :      return false;
 118    :    }
 119  E :    const DWORD kOneSecondInMs = 1000;
 120    :  
 121  E :    switch (::WaitForSingleObject(mutex, kOneSecondInMs)) {
 122    :      case WAIT_ABANDONED:
 123  i :        LOG(WARNING) << "Orphaned service mutex found!";
 124    :        // Fall through...
 125    :  
 126    :      case WAIT_OBJECT_0:
 127  E :        VLOG(1) << "Service mutex acquired.";
 128  E :        service_mutex_.Set(mutex.Take());
 129  E :        return true;
 130    :  
 131    :      case WAIT_TIMEOUT:
 132  E :        LOG(ERROR) << "Another instance of the service is running.";
 133  E :        break;
 134    :  
 135    :      default: {
 136  i :        DWORD error = ::GetLastError();
 137  i :        LOG(ERROR) << "Failed to acquire mutex: " << com::LogWe(error) << ".";
 138    :        break;
 139    :      }
 140    :    }
 141  E :    return false;
 142  E :  }
 143    :  
 144  E :  void Service::ReleaseServiceMutex() {
 145  E :    DCHECK_EQ(owner_thread_, base::PlatformThread::CurrentId());
 146    :  
 147  E :    if (service_mutex_.IsValid()) {
 148  E :      ::ReleaseMutex(service_mutex_);
 149  E :      service_mutex_.Close();
 150    :    }
 151  E :  }
 152    :  
 153  E :  bool Service::InitializeRpc()  {
 154  E :    DCHECK_EQ(owner_thread_, base::PlatformThread::CurrentId());
 155    :  
 156  E :    if (rpc_is_initialized_) {
 157  i :      LOG(WARNING) << "The call trace service RPC stack is already initialized.";
 158  i :      return true;
 159    :    }
 160    :  
 161  E :    RPC_STATUS status = RPC_S_OK;
 162    :  
 163    :    // Initialize the RPC protocol we want to use.
 164  E :    std::wstring protocol;
 165  E :    std::wstring endpoint;
 166  E :    ::GetSyzygyCallTraceRpcProtocol(&protocol);
 167  E :    ::GetSyzygyCallTraceRpcEndpoint(instance_id_, &endpoint);
 168    :  
 169  E :    VLOG(1) << "Initializing RPC endpoint '" << endpoint << "' "
 170    :            << "using the '" << protocol << "' protocol.";
 171    :    status = ::RpcServerUseProtseqEp(
 172    :        reinterpret_cast<RPC_WSTR>(&protocol[0]),
 173    :        RPC_C_LISTEN_MAX_CALLS_DEFAULT,
 174    :        reinterpret_cast<RPC_WSTR>(&endpoint[0]),
 175  E :        NULL /* Security descriptor. */);
 176  E :    if (status != RPC_S_OK && status != RPC_S_DUPLICATE_ENDPOINT) {
 177  i :      LOG(ERROR) << "Failed to init RPC protocol: " << com::LogWe(status) << ".";
 178  i :      return false;
 179    :    }
 180    :  
 181    :    // Register the server version of the CallTrace interface.
 182  E :    VLOG(1) << "Registering the CallTrace interface.";
 183    :    status = ::RpcServerRegisterIf(
 184  E :        CallTraceService_CallTrace_v1_0_s_ifspec, NULL, NULL);
 185  E :    if (status != RPC_S_OK) {
 186  i :      LOG(ERROR) << "Failed to register CallTrace RPC interface: "
 187    :                 << com::LogWe(status) << ".";
 188  i :      return false;
 189    :    }
 190    :  
 191    :    // Register the server version of the CallTraceControl interface.
 192  E :    VLOG(1) << "Registering the CallTraceControl interface.";
 193    :    status = ::RpcServerRegisterIf(
 194  E :        CallTraceService_CallTraceControl_v1_0_s_ifspec, NULL, NULL);
 195  E :    if (status != RPC_S_OK) {
 196  i :      LOG(ERROR) << "Failed to register CallTraceControl RPC interface: "
 197    :                 << com::LogWe(status) << ".";
 198  i :      return false;
 199    :    }
 200    :  
 201  E :    rpc_is_initialized_ = true;
 202  E :    return true;
 203  E :  }
 204    :  
 205  E :  bool Service::RunRPC(bool non_blocking) {
 206  E :    VLOG(1) << "Starting the RPC server.";
 207    :  
 208  E :    DCHECK_EQ(owner_thread_, base::PlatformThread::CurrentId());
 209    :  
 210  E :    if (rpc_is_running_) {
 211  i :      LOG(ERROR) << "The RPC server is already running.";
 212  i :      return false;
 213    :    }
 214    :  
 215  E :    rpc_is_running_ = true;
 216  E :    rpc_is_non_blocking_ = non_blocking;
 217    :  
 218    :    RPC_STATUS status = ::RpcServerListen(
 219    :        1,  // Minimum number of handler threads.
 220    :        RPC_C_LISTEN_MAX_CALLS_DEFAULT,
 221  E :        TRUE);
 222    :  
 223  E :    if (status != RPC_S_OK)
 224  i :      LOG(ERROR) << "Failed to run RPC server: " << com::LogWe(status) << ".";
 225    :  
 226  E :    if (status == RPC_S_OK) {
 227    :      // Signal that the service is up and running.
 228  E :      DCHECK(service_event_.IsValid());
 229  E :      BOOL success = ::SetEvent(service_event_.Get());
 230  E :      DCHECK_EQ(TRUE, success);
 231    :  
 232    :      // Wait here if we're in blocking mode.
 233  E :      if (!non_blocking) {
 234  E :        status = RpcMgmtWaitServerListen();
 235    :  
 236  E :        if (status != RPC_S_OK) {
 237  i :          LOG(ERROR) << "Failed to wait on RPC server: "
 238    :                     << com::LogWe(status) << ".";
 239    :        }
 240    :      }
 241    :    }
 242    :  
 243  E :    if (status != RPC_S_OK) {
 244  i :      rpc_is_running_ = false;
 245  i :      rpc_is_non_blocking_ = false;
 246  i :      return false;
 247    :    }
 248    :  
 249  E :    if (rpc_is_non_blocking_) {
 250  E :      VLOG(1) << "RPC server is running.";
 251    :    }
 252    :  
 253  E :    return true;
 254  E :  }
 255    :  
 256  E :  void Service::StopRpc() {
 257  E :    if (!rpc_is_running_)
 258  E :      return;
 259    :  
 260    :    // Stop the RPC Server.
 261  E :    base::AutoLock auto_lock(lock_);
 262  E :    if (rpc_is_running_) {
 263  E :      VLOG(1) << "Stopping RPC server.";
 264  E :      RPC_STATUS status = ::RpcMgmtStopServerListening(NULL);
 265  E :      if (status != RPC_S_OK) {
 266  i :        LOG(ERROR) << "Failed to stop the RPC server: "
 267    :                   << com::LogWe(status) << ".";
 268    :      }
 269  E :      rpc_is_running_ = false;
 270    :    }
 271  E :  }
 272    :  
 273  E :  void Service::CleanupRpc() {
 274  E :    DCHECK_EQ(owner_thread_, base::PlatformThread::CurrentId());
 275  E :    DCHECK(rpc_is_running_ == false);
 276    :  
 277  E :    RPC_STATUS status = RPC_S_OK;
 278    :  
 279    :    // If we're running in non-blocking mode, then we have to wait for
 280    :    // any in-flight RPC requests to terminate.
 281  E :    if (rpc_is_non_blocking_) {
 282  E :      VLOG(1) << "Waiting for outstanding RPC requests to terminate.";
 283  E :      status = ::RpcMgmtWaitServerListen();
 284  E :      if (status != RPC_S_OK && status != RPC_S_NOT_LISTENING) {
 285  i :        LOG(ERROR) << "Failed wait for RPC server shutdown: "
 286    :                   << com::LogWe(status) << ".";
 287    :      }
 288  E :      rpc_is_non_blocking_ = false;
 289    :    }
 290    :  
 291    :    // Unregister the RPC interfaces.
 292  E :    if (rpc_is_initialized_) {
 293  E :      VLOG(1) << "Unregistering RPC interfaces.";
 294  E :      status = ::RpcServerUnregisterIf(NULL, NULL, FALSE);
 295  E :      if (status != RPC_S_OK) {
 296  i :        LOG(ERROR) << "Failed to unregister RPC interfaces: "
 297    :                   << com::LogWe(status) << ".";
 298    :      }
 299  E :      rpc_is_initialized_ = false;
 300    :    }
 301  E :  }
 302    :  
 303  E :  bool Service::Start(bool non_blocking) {
 304  E :    LOG(INFO) << "Starting the call-trace service.";
 305    :  
 306  E :    DCHECK_EQ(owner_thread_, base::PlatformThread::CurrentId());
 307    :  
 308  E :    if (!AcquireServiceMutex())
 309  E :      return false;
 310    :  
 311  E :    if (!OpenServiceEvent())
 312  E :      return false;
 313    :  
 314  E :    if (!InitializeRpc()) {
 315  i :      ReleaseServiceMutex();
 316  i :      return false;
 317    :    }
 318    :  
 319  E :    LOG(INFO) << "The call-trace service is running.";
 320    :  
 321  E :    return RunRPC(non_blocking);
 322  E :  }
 323    :  
 324  E :  bool Service::Stop() {
 325  E :    DCHECK_EQ(owner_thread_, base::PlatformThread::CurrentId());
 326    :  
 327  E :    LOG(INFO) << "Stopping the call-trace service.";
 328    :  
 329  E :    StopRpc();
 330  E :    CleanupRpc();
 331  E :    CloseAllOpenSessions();
 332  E :    ReleaseServiceMutex();
 333    :  
 334    :    // Signal that we've shut down.
 335  E :    if (service_event_.IsValid())
 336  E :      ::ResetEvent(service_event_.Get());
 337    :  
 338  E :    LOG(INFO) << "The call-trace service is stopped.";
 339  E :    return true;
 340  E :  }
 341    :  
 342  E :  bool Service::CloseAllOpenSessions() {
 343  E :    DCHECK_EQ(owner_thread_, base::PlatformThread::CurrentId());
 344  E :    DCHECK(!rpc_is_running_);
 345    :  
 346  E :    VLOG(1) << "Flushing all outstanding buffers.";
 347    :  
 348  E :    SessionMap to_close;
 349    :    {
 350  E :      base::AutoLock auto_lock(lock_);
 351  E :      to_close.swap(sessions_);
 352    :  
 353  E :      DCHECK(sessions_.empty());
 354  E :    }
 355    :  
 356    :    // Tell each session that they are to be closed. This will get them to
 357    :    // flush all outstanding buffers to their respective consumers.
 358  E :    SessionMap::iterator iter = to_close.begin();
 359  E :    for (; iter != to_close.end(); ++iter) {
 360  E :      iter->second->Close();
 361  E :    }
 362    :  
 363    :    // Release the references we hold to the closing sessions.
 364  E :    to_close.clear();
 365    :  
 366    :    // Wait until all pending sessions have closed.
 367    :    {
 368  E :      base::AutoLock auto_lock(lock_);
 369    :  
 370  E :      int pending_sessions = 0;
 371  E :      while ((pending_sessions = num_active_sessions_) != 0) {
 372  E :        VLOG(1) << "There are " << pending_sessions << " pending sessions.";
 373  E :        a_session_has_closed_.Wait();
 374  E :      }
 375  E :    }
 376    :  
 377  E :    return true;
 378  E :  }
 379    :  
 380  E :  Session* Service::CreateSession() {
 381  E :    return new Session(this);
 382  E :  }
 383    :  
 384    :  // RPC entry point.
 385  E :  bool Service::RequestShutdown() {
 386  E :    VLOG(1) << "Requesting a shutdown of the call trace service.";
 387    :  
 388  E :    StopRpc();
 389    :  
 390  E :    return true;
 391  E :  }
 392    :  
 393    :  // RPC entry point.
 394    :  bool Service::CreateSession(handle_t binding,
 395    :                              SessionHandle* session_handle,
 396    :                              CallTraceBuffer* call_trace_buffer,
 397  E :                              unsigned long* flags) {
 398    :    if (binding == NULL || session_handle == NULL || call_trace_buffer == NULL ||
 399  E :        flags == NULL) {
 400  i :      LOG(WARNING) << "Invalid RPC parameters.";
 401  i :      return false;
 402    :    }
 403  E :    const int kVersion = 2;
 404  E :    RPC_CALL_ATTRIBUTES_V2 attribs = { kVersion, RPC_QUERY_CLIENT_PID };
 405  E :    RPC_STATUS status = RpcServerInqCallAttributes(binding, &attribs);
 406  E :    if (status != RPC_S_OK) {
 407  i :      LOG(ERROR) << "Failed to query RPC call attributes: "
 408    :                 << com::LogWe(status) << ".";
 409  i :      return false;
 410    :    }
 411    :  
 412  E :    ProcessId client_process_id = reinterpret_cast<ProcessId>(attribs.ClientPID);
 413    :  
 414  E :    VLOG(1) << "Registering client process PID=" << client_process_id << ".";
 415    :  
 416  E :    scoped_refptr<Session> session;
 417  E :    if (!GetNewSession(client_process_id, &session))
 418  i :      return false;
 419    :  
 420  E :    DCHECK(session.get() != NULL);
 421    :  
 422    :    // Request a buffer for the client.
 423  E :    Buffer* client_buffer = NULL;
 424  E :    if (!session->GetNextBuffer(&client_buffer)) {
 425  i :      sessions_.erase(session->client_process_id());
 426  i :      session->Close();
 427  i :      return false;
 428    :    }
 429  E :    DCHECK(client_buffer != NULL);
 430    :  
 431    :    // Copy buffer info into the RPC struct, slicing off the private bits.
 432  E :    *session_handle = reinterpret_cast<SessionHandle>(session.get());
 433  E :    *call_trace_buffer = *client_buffer;
 434  E :    *flags = flags_;
 435    :  
 436  E :    return true;
 437  E :  }
 438    :  
 439    :  // RPC entry point.
 440    :  bool Service::AllocateBuffer(SessionHandle session_handle,
 441  E :                               CallTraceBuffer* call_trace_buffer) {
 442  E :    if (session_handle == NULL || call_trace_buffer == NULL) {
 443  i :      LOG(WARNING) << "Invalid RPC parameters.";
 444  i :      return false;
 445    :    }
 446    :  
 447  E :    scoped_refptr<Session> session;
 448  E :    if (!GetExistingSession(session_handle, &session))
 449  i :      return false;
 450  E :    DCHECK(session.get() != NULL);
 451    :  
 452    :    // Request a buffer for the client.
 453  E :    Buffer* client_buffer = NULL;
 454  E :    if (!session->GetNextBuffer(&client_buffer))
 455  i :      return false;
 456    :  
 457    :    // Copy buffer info into the RPC struct, slicing off the private bits.
 458  E :    DCHECK(client_buffer != NULL);
 459  E :    *call_trace_buffer = *client_buffer;
 460    :  
 461  E :    return true;
 462  E :  }
 463    :  
 464    :  // RPC entry point.
 465    :  bool Service::AllocateLargeBuffer(SessionHandle session_handle,
 466    :                                    size_t minimum_size,
 467  E :                                    CallTraceBuffer* call_trace_buffer) {
 468  E :    if (session_handle == NULL || call_trace_buffer == NULL) {
 469  i :      LOG(WARNING) << "Invalid RPC parameters.";
 470  i :      return false;
 471    :    }
 472    :  
 473  E :    scoped_refptr<Session> session;
 474  E :    if (!GetExistingSession(session_handle, &session))
 475  i :      return false;
 476  E :    DCHECK(session.get() != NULL);
 477    :  
 478    :    // Request a buffer for the client.
 479  E :    Buffer* client_buffer = NULL;
 480  E :    if (!session->GetBuffer(minimum_size, &client_buffer))
 481  i :      return false;
 482    :  
 483    :    // Copy buffer info into the RPC struct, slicing off the private bits.
 484  E :    DCHECK(client_buffer != NULL);
 485  E :    *call_trace_buffer = *client_buffer;
 486    :  
 487  E :    return true;
 488  E :  }
 489    :  
 490    :  // RPC entry point.
 491    :  bool Service::CommitAndExchangeBuffer(SessionHandle session_handle,
 492    :                                        CallTraceBuffer* call_trace_buffer,
 493  E :                                        ExchangeFlag perform_exchange) {
 494  E :    if (session_handle == NULL || call_trace_buffer == NULL) {
 495  i :      LOG(WARNING) << "Invalid RPC parameters.";
 496  i :      return false;
 497    :    }
 498    :  
 499    :    DCHECK(perform_exchange == PERFORM_EXCHANGE ||
 500  E :           perform_exchange == DO_NOT_PERFORM_EXCHANGE);
 501    :  
 502  E :    bool result = true;
 503  E :    scoped_refptr<Session> session;
 504  E :    if (!GetExistingSession(session_handle, &session))
 505  i :      return false;
 506  E :    DCHECK(session.get() != NULL);
 507    :  
 508  E :    Buffer* buffer = NULL;
 509  E :    if (!session->FindBuffer(call_trace_buffer, &buffer))
 510  i :      return false;
 511    :  
 512  E :    DCHECK(buffer != NULL);
 513    :  
 514    :    // We can't say anything about the buffer's state, as it possible that the
 515    :    // session that owns it has already been asked to shutdown, in which case
 516    :    // all of its buffers have already been scheduled for writing and the call
 517    :    // below will be ignored.
 518    :  
 519    :    // Return the buffer to the session. The session will then take care of
 520    :    // scheduling it for writing. Currently, it feeds it right back to us, but
 521    :    // this routing allows the write-queue to be decoupled from the service
 522    :    // more easily in the future.
 523  E :    if (!session->ReturnBuffer(buffer)) {
 524  i :      LOG(ERROR) << "Unable to return buffer to session.";
 525  i :      return false;
 526    :    }
 527    :  
 528  E :    ZeroMemory(call_trace_buffer, sizeof(*call_trace_buffer));
 529    :  
 530  E :    if (perform_exchange == PERFORM_EXCHANGE) {
 531    :      // Request a buffer for the client.
 532  E :      Buffer* client_buffer = NULL;
 533  E :      if (!session->GetNextBuffer(&client_buffer)) {
 534  i :        result = false;
 535  i :      } else {
 536    :        // Copy buffer info into the RPC struct, slicing off the private bits.
 537  E :        DCHECK(client_buffer != NULL);
 538  E :        *call_trace_buffer = *client_buffer;
 539    :      }
 540    :    }
 541    :  
 542  E :    return result;
 543  E :  }
 544    :  
 545    :  // RPC entry-point.
 546  E :  bool Service::CloseSession(SessionHandle* session_handle) {
 547  E :    if (session_handle == NULL || *session_handle == NULL) {
 548  i :      LOG(WARNING) << "Invalid RPC parameters.";
 549  i :      return false;
 550    :    }
 551    :  
 552  E :    scoped_refptr<Session> session;
 553    :    {
 554  E :      base::AutoLock auto_lock(lock_);
 555    :  
 556  E :      if (!GetExistingSessionUnlocked(*session_handle, &session))
 557  i :        return false;
 558    :  
 559  E :      size_t num_erased = sessions_.erase(session->client_process_id());
 560  E :      DCHECK_EQ(1U, num_erased);
 561  E :    }
 562    :  
 563  E :    DCHECK(session.get() != NULL);
 564    :  
 565    :    // Signal that we want the session to close. This will cause it to
 566    :    // schedule all of its outstanding buffers for writing. It will destroy
 567    :    // itself once it's reference count drops to zero.
 568  E :    session->Close();
 569    :  
 570  E :    *session_handle = NULL;
 571    :  
 572  E :    return true;
 573  E :  }
 574    :  
 575    :  bool Service::GetNewSession(ProcessId client_process_id,
 576  E :                              scoped_refptr<Session>* session) {
 577  E :    DCHECK(session != NULL);
 578  E :    *session = NULL;
 579    :  
 580    :    // Create the new session.
 581  E :    scoped_refptr<Session> new_session(CreateSession());
 582  E :    if (new_session.get() == NULL)
 583  i :      return false;
 584    :  
 585    :    // Initialize the session.
 586  E :    if (!new_session->Init(client_process_id))
 587  i :      return false;
 588    :  
 589    :    // Allocate a new buffer consumer.
 590  E :    scoped_refptr<BufferConsumer> consumer;
 591  E :    if (!buffer_consumer_factory_->CreateConsumer(&consumer))
 592  i :      return false;
 593    :  
 594    :    // Open the buffer consumer.
 595  E :    if (!consumer->Open(new_session))
 596  i :      return false;
 597    :  
 598    :    // Hand the buffer consumer over to the session. The session will direct
 599    :    // returned buffers to the consumer.
 600  E :    new_session->set_buffer_consumer(consumer);
 601    :  
 602  E :    bool inserted = false;
 603    :    {
 604  E :      base::AutoLock auto_lock(lock_);
 605    :      // Attempt to add the session to the session map.
 606    :      inserted = sessions_.insert(
 607  E :          SessionMap::value_type(client_process_id, new_session)).second;
 608  E :    }
 609    :  
 610  E :    if (inserted == false) {
 611  i :      LOG(ERROR) << "A session already exists for process " << client_process_id
 612    :          << ".";
 613  i :      consumer->Close(new_session.get());
 614  i :      CHECK(new_session->Close());
 615    :  
 616  i :      return false;
 617    :    }
 618    :  
 619    :    // The session map has taken ownership of the session object; release
 620    :    // and return the session pointer.
 621  E :    *session = new_session;
 622    :  
 623  E :    return true;
 624  E :  }
 625    :  
 626    :  bool Service::GetExistingSession(SessionHandle session_handle,
 627  E :                                   scoped_refptr<Session>* session) {
 628  E :    DCHECK(session != NULL);
 629  E :    base::AutoLock auto_lock(lock_);
 630    :  
 631  E :    return GetExistingSessionUnlocked(session_handle, session);
 632  E :  }
 633    :  
 634    :  bool Service::GetExistingSessionUnlocked(SessionHandle session_handle,
 635  E :                                           scoped_refptr<Session>* session) {
 636  E :    DCHECK(session != NULL);
 637  E :    lock_.AssertAcquired();
 638    :  
 639  E :    *session = reinterpret_cast<Session*>(session_handle);
 640    :  
 641    :  #ifndef NDEBUG
 642  E :    if (sessions_.find((*session)->client_process_id()) == sessions_.end()) {
 643  i :      LOG(ERROR) << "No session exists for handle " << session_handle << ".";
 644  i :      *session = static_cast<Session*>(NULL);
 645  i :      return false;
 646    :    }
 647    :  #endif
 648    :  
 649  E :    return true;
 650  E :  }
 651    :  
 652    :  }  // namespace service
 653    :  }  // namespace trace

Coverage information generated Thu Jul 04 09:34:53 2013.