Logo Search packages:      
Sourcecode: chromium-browser version File versions  Download package

message_port_dispatcher.cc

// Copyright (c) 2009 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "chrome/browser/worker_host/message_port_dispatcher.h"

#include "base/callback.h"
#include "base/singleton.h"
#include "chrome/browser/chrome_thread.h"
#include "chrome/browser/renderer_host/resource_message_filter.h"
#include "chrome/browser/worker_host/worker_process_host.h"
#include "chrome/common/notification_service.h"
#include "chrome/common/worker_messages.h"


MessagePortDispatcher* MessagePortDispatcher::GetInstance() {
  return Singleton<MessagePortDispatcher>::get();
}

MessagePortDispatcher::MessagePortDispatcher()
    : next_message_port_id_(0),
      sender_(NULL),
      next_routing_id_(NULL) {
  // Receive a notification if a message filter or WorkerProcessHost is deleted.
  registrar_.Add(this, NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN,
                 NotificationService::AllSources());

  registrar_.Add(this, NotificationType::WORKER_PROCESS_HOST_SHUTDOWN,
                 NotificationService::AllSources());
}

MessagePortDispatcher::~MessagePortDispatcher() {
}

bool MessagePortDispatcher::OnMessageReceived(
    const IPC::Message& message,
    IPC::Message::Sender* sender,
    CallbackWithReturnValue<int>::Type* next_routing_id,
    bool* message_was_ok) {
  sender_ = sender;
  next_routing_id_ = next_routing_id;

  bool handled = true;
  *message_was_ok = true;

  IPC_BEGIN_MESSAGE_MAP_EX(MessagePortDispatcher, message, *message_was_ok)
    IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_CreateMessagePort, OnCreate)
    IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_DestroyMessagePort, OnDestroy)
    IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_Entangle, OnEntangle)
    IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_PostMessage, OnPostMessage)
    IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_QueueMessages, OnQueueMessages)
    IPC_MESSAGE_HANDLER(WorkerProcessHostMsg_SendQueuedMessages,
                        OnSendQueuedMessages)
    IPC_MESSAGE_UNHANDLED(handled = false)
  IPC_END_MESSAGE_MAP_EX()

  sender_ = NULL;
  next_routing_id_ = NULL;

  return handled;
}

void MessagePortDispatcher::UpdateMessagePort(
    int message_port_id,
    IPC::Message::Sender* sender,
    int routing_id,
    CallbackWithReturnValue<int>::Type* next_routing_id) {
  DCHECK(CheckMessagePortMap(true));
  if (!message_ports_.count(message_port_id)) {
    NOTREACHED();
    return;
  }

  MessagePort& port = message_ports_[message_port_id];
  port.sender = sender;
  port.route_id = routing_id;
  port.next_routing_id = next_routing_id;
  DCHECK(CheckMessagePortMap(true));
}

bool MessagePortDispatcher::Send(IPC::Message* message) {
  DCHECK(CheckMessagePortMap(true));
  return sender_->Send(message);
}

void MessagePortDispatcher::OnCreate(int *route_id,
                                     int* message_port_id) {
  DCHECK(CheckMessagePortMap(true));
  *message_port_id = ++next_message_port_id_;
  *route_id = next_routing_id_->Run();

  MessagePort port;
  port.sender = sender_;
  port.route_id = *route_id;
  port.next_routing_id = next_routing_id_;
  port.message_port_id = *message_port_id;
  port.entangled_message_port_id = MSG_ROUTING_NONE;
  port.queue_messages = false;
  message_ports_[*message_port_id] = port;
  DCHECK(CheckMessagePortMap(true));
}

void MessagePortDispatcher::OnDestroy(int message_port_id) {
  DCHECK(CheckMessagePortMap(true));
  if (!message_ports_.count(message_port_id)) {
    NOTREACHED();
    return;
  }

  DCHECK(message_ports_[message_port_id].queued_messages.empty());
  Erase(message_port_id);
  DCHECK(CheckMessagePortMap(true));
}

void MessagePortDispatcher::OnEntangle(int local_message_port_id,
                                       int remote_message_port_id) {
  DCHECK(CheckMessagePortMap(false));
  if (!message_ports_.count(local_message_port_id) ||
      !message_ports_.count(remote_message_port_id)) {
    NOTREACHED();
    return;
  }

  DCHECK(message_ports_[remote_message_port_id].entangled_message_port_id ==
      MSG_ROUTING_NONE);
  message_ports_[remote_message_port_id].entangled_message_port_id =
      local_message_port_id;
  DCHECK(CheckMessagePortMap(false));
}

void MessagePortDispatcher::OnPostMessage(
    int sender_message_port_id,
    const string16& message,
    const std::vector<int>& sent_message_port_ids) {
  DCHECK(CheckMessagePortMap(true));
  if (!message_ports_.count(sender_message_port_id)) {
    NOTREACHED();
    return;
  }

  int entangled_message_port_id =
      message_ports_[sender_message_port_id].entangled_message_port_id;
  if (entangled_message_port_id == MSG_ROUTING_NONE)
    return;  // Process could have crashed.

  if (!message_ports_.count(entangled_message_port_id)) {
    NOTREACHED();
    return;
  }

  PostMessageTo(entangled_message_port_id, message, sent_message_port_ids);
  DCHECK(CheckMessagePortMap(true));
}

void MessagePortDispatcher::PostMessageTo(
    int message_port_id,
    const string16& message,
    const std::vector<int>& sent_message_port_ids) {
  DCHECK(CheckMessagePortMap(true));
  if (!message_ports_.count(message_port_id)) {
    NOTREACHED();
    return;
  }
  for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
    if (!message_ports_.count(sent_message_port_ids[i])) {
      NOTREACHED();
      return;
    }
  }

  MessagePort& entangled_port = message_ports_[message_port_id];

  std::vector<MessagePort*> sent_ports(sent_message_port_ids.size());
  for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
    sent_ports[i] = &message_ports_[sent_message_port_ids[i]];
    sent_ports[i]->queue_messages = true;
  }

  if (entangled_port.queue_messages) {
    entangled_port.queued_messages.push_back(
        std::make_pair(message, sent_message_port_ids));
  } else {
    // If a message port was sent around, the new location will need a routing
    // id.  Instead of having the created port send us a sync message to get it,
    // send along with the message.
    std::vector<int> new_routing_ids(sent_message_port_ids.size());
    for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
      new_routing_ids[i] = entangled_port.next_routing_id->Run();
      sent_ports[i]->sender = entangled_port.sender;

      // Update the entry for the sent port as it can be in a different process.
      sent_ports[i]->route_id = new_routing_ids[i];
    }

    // Now send the message to the entangled port.
    IPC::Message* ipc_msg = new WorkerProcessMsg_Message(
        entangled_port.route_id, message, sent_message_port_ids,
        new_routing_ids);
    entangled_port.sender->Send(ipc_msg);
  }
  DCHECK(CheckMessagePortMap(true));
}

void MessagePortDispatcher::OnQueueMessages(int message_port_id) {
  DCHECK(CheckMessagePortMap(true));
  if (!message_ports_.count(message_port_id)) {
    NOTREACHED();
    return;
  }

  MessagePort& port = message_ports_[message_port_id];
  port.sender->Send(new WorkerProcessMsg_MessagesQueued(port.route_id));
  port.queue_messages = true;
  port.sender = NULL;
  DCHECK(CheckMessagePortMap(true));
}

void MessagePortDispatcher::OnSendQueuedMessages(
    int message_port_id,
    const QueuedMessages& queued_messages) {
  DCHECK(CheckMessagePortMap(true));
  if (!message_ports_.count(message_port_id)) {
    NOTREACHED();
    return;
  }

  // Send the queued messages to the port again.  This time they'll reach the
  // new location.
  MessagePort& port = message_ports_[message_port_id];
  port.queue_messages = false;
  port.queued_messages.insert(port.queued_messages.begin(),
                              queued_messages.begin(),
                              queued_messages.end());
  SendQueuedMessagesIfPossible(message_port_id);
  DCHECK(CheckMessagePortMap(true));
}

void MessagePortDispatcher::SendQueuedMessagesIfPossible(int message_port_id) {
  DCHECK(CheckMessagePortMap(true));
  if (!message_ports_.count(message_port_id)) {
    NOTREACHED();
    return;
  }

  MessagePort& port = message_ports_[message_port_id];
  if (port.queue_messages || !port.sender)
    return;

  for (QueuedMessages::iterator iter = port.queued_messages.begin();
       iter != port.queued_messages.end(); ++iter) {
    PostMessageTo(message_port_id, iter->first, iter->second);
  }
  port.queued_messages.clear();
  DCHECK(CheckMessagePortMap(true));
}

void MessagePortDispatcher::Observe(NotificationType type,
                                    const NotificationSource& source,
                                    const NotificationDetails& details) {
  DCHECK(CheckMessagePortMap(true));

  IPC::Message::Sender* sender = NULL;
  if (type.value == NotificationType::RESOURCE_MESSAGE_FILTER_SHUTDOWN) {
    sender = Source<ResourceMessageFilter>(source).ptr();
  } else if (type.value == NotificationType::WORKER_PROCESS_HOST_SHUTDOWN) {
    sender = Source<WorkerProcessHost>(source).ptr();
  } else {
    NOTREACHED();
  }

  // Check if the (possibly) crashed process had any message ports.
  for (MessagePorts::iterator iter = message_ports_.begin();
       iter != message_ports_.end();) {
    MessagePorts::iterator cur_item = iter++;
    if (cur_item->second.sender == sender) {
      Erase(cur_item->first);
    }
  }

  DCHECK(CheckMessagePortMap(true));
}

void MessagePortDispatcher::Erase(int message_port_id) {
  MessagePorts::iterator erase_item = message_ports_.find(message_port_id);
  DCHECK(erase_item != message_ports_.end());

  int entangled_id = erase_item->second.entangled_message_port_id;
  if (entangled_id != MSG_ROUTING_NONE) {
    // Do the disentanglement (and be paranoid about the other side existing
    // just in case something unusual happened during entanglement).
    if (message_ports_.count(entangled_id)) {
      message_ports_[entangled_id].entangled_message_port_id = MSG_ROUTING_NONE;
    }
  }
  message_ports_.erase(erase_item);
}

#ifndef NDEBUG
bool MessagePortDispatcher::CheckMessagePortMap(bool check_entanglements) {
  DCHECK(ChromeThread::CurrentlyOn(ChromeThread::IO));

  for (MessagePorts::iterator iter = message_ports_.begin();
       iter != message_ports_.end(); iter++) {
    DCHECK(iter->first <= next_message_port_id_);
    DCHECK(iter->first == iter->second.message_port_id);

    int entangled_id = iter->second.entangled_message_port_id;
    if (check_entanglements && entangled_id != MSG_ROUTING_NONE) {
      MessagePorts::iterator entangled_item = message_ports_.find(entangled_id);
      DCHECK(entangled_item != message_ports_.end());
      DCHECK(entangled_item->second.entangled_message_port_id == iter->first);
    }
  }
  return true;
}
#endif  // NDEBUG

Generated by  Doxygen 1.6.0   Back to index