Logo Search packages:      
Sourcecode: chromium-browser version File versions  Download package

webmessageportchannel_impl.cc

// Copyright (c) 2009 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "chrome/common/webmessageportchannel_impl.h"

#include "chrome/common/child_process.h"
#include "chrome/common/child_thread.h"
#include "chrome/common/worker_messages.h"
#include "third_party/WebKit/WebKit/chromium/public/WebString.h"
#include "third_party/WebKit/WebKit/chromium/public/WebMessagePortChannelClient.h"

using WebKit::WebMessagePortChannel;
using WebKit::WebMessagePortChannelArray;
using WebKit::WebMessagePortChannelClient;
using WebKit::WebString;

WebMessagePortChannelImpl::WebMessagePortChannelImpl()
    : client_(NULL),
      route_id_(MSG_ROUTING_NONE),
      message_port_id_(MSG_ROUTING_NONE) {
  AddRef();
  Init();
}

WebMessagePortChannelImpl::WebMessagePortChannelImpl(
    int route_id,
    int message_port_id)
    : client_(NULL),
      route_id_(route_id),
      message_port_id_(message_port_id) {
  AddRef();
  Init();
}

WebMessagePortChannelImpl::~WebMessagePortChannelImpl() {
  // If we have any queued messages with attached ports, manually destroy them.
  while (!message_queue_.empty()) {
    const std::vector<WebMessagePortChannelImpl*>& channel_array =
        message_queue_.front().ports;
    for (size_t i = 0; i < channel_array.size(); i++) {
      channel_array[i]->destroy();
    }
    message_queue_.pop();
  }

  if (message_port_id_ != MSG_ROUTING_NONE)
    Send(new WorkerProcessHostMsg_DestroyMessagePort(message_port_id_));

  if (route_id_ != MSG_ROUTING_NONE)
    ChildThread::current()->RemoveRoute(route_id_);
}

void WebMessagePortChannelImpl::setClient(WebMessagePortChannelClient* client) {
  // Must lock here since client_ is called on the main thread.
  AutoLock auto_lock(lock_);
  client_ = client;
}

void WebMessagePortChannelImpl::destroy() {
  setClient(NULL);

  // Release the object on the main thread, since the destructor might want to
  // send an IPC, and that has to happen on the main thread.
  ChildThread::current()->message_loop()->ReleaseSoon(FROM_HERE, this);
}

void WebMessagePortChannelImpl::entangle(WebMessagePortChannel* channel) {
  // The message port ids might not be set up yet, if this channel wasn't
  // created on the main thread.  So need to wait until we're on the main thread
  // before getting the other message port id.
  scoped_refptr<WebMessagePortChannelImpl> webchannel =
      static_cast<WebMessagePortChannelImpl*>(channel);
  Entangle(webchannel);
}

void WebMessagePortChannelImpl::postMessage(
    const WebString& message,
    WebMessagePortChannelArray* channels) {
  if (MessageLoop::current() != ChildThread::current()->message_loop()) {
    ChildThread::current()->message_loop()->PostTask(FROM_HERE,
        NewRunnableMethod(this, &WebMessagePortChannelImpl::postMessage,
            message, channels));
    return;
  }

  std::vector<int> message_port_ids(channels ? channels->size() : 0);
  if (channels) {
    // Extract the port IDs from the source array, then free it.
    for (size_t i = 0; i < channels->size(); ++i) {
      WebMessagePortChannelImpl* webchannel =
          static_cast<WebMessagePortChannelImpl*>((*channels)[i]);
      message_port_ids[i] = webchannel->message_port_id();
      webchannel->QueueMessages();
      DCHECK(message_port_ids[i] != MSG_ROUTING_NONE);
    }
    delete channels;
  }

  IPC::Message* msg = new WorkerProcessHostMsg_PostMessage(
      message_port_id_, message, message_port_ids);
  Send(msg);
}

bool WebMessagePortChannelImpl::tryGetMessage(
    WebString* message,
    WebMessagePortChannelArray& channels) {
  AutoLock auto_lock(lock_);
  if (message_queue_.empty())
    return false;

  *message = message_queue_.front().message;
  const std::vector<WebMessagePortChannelImpl*>& channel_array =
      message_queue_.front().ports;
  WebMessagePortChannelArray result_ports(channel_array.size());
  for (size_t i = 0; i < channel_array.size(); i++) {
    result_ports[i] = channel_array[i];
  }

  channels.swap(result_ports);
  message_queue_.pop();
  return true;
}

void WebMessagePortChannelImpl::Init() {
  if (MessageLoop::current() != ChildThread::current()->message_loop()) {
    ChildThread::current()->message_loop()->PostTask(FROM_HERE,
        NewRunnableMethod(this, &WebMessagePortChannelImpl::Init));
    return;
  }

  if (route_id_ == MSG_ROUTING_NONE) {
    DCHECK(message_port_id_ == MSG_ROUTING_NONE);
    Send(new WorkerProcessHostMsg_CreateMessagePort(
        &route_id_, &message_port_id_));
  }

  ChildThread::current()->AddRoute(route_id_, this);
}

void WebMessagePortChannelImpl::Entangle(
    scoped_refptr<WebMessagePortChannelImpl> channel) {
  if (MessageLoop::current() != ChildThread::current()->message_loop()) {
    ChildThread::current()->message_loop()->PostTask(FROM_HERE,
        NewRunnableMethod(this, &WebMessagePortChannelImpl::Entangle, channel));
    return;
  }

  Send(new WorkerProcessHostMsg_Entangle(
      message_port_id_, channel->message_port_id()));
}

void WebMessagePortChannelImpl::QueueMessages() {
  if (MessageLoop::current() != ChildThread::current()->message_loop()) {
    ChildThread::current()->message_loop()->PostTask(FROM_HERE,
        NewRunnableMethod(this, &WebMessagePortChannelImpl::QueueMessages));
    return;
  }
  // This message port is being sent elsewhere (perhaps to another process).
  // The new endpoint needs to receive the queued messages, including ones that
  // could still be in-flight.  So we tell the browser to queue messages, and it
  // sends us an ack, whose receipt we know means that no more messages are
  // in-flight.  We then send the queued messages to the browser, which prepends
  // them to the ones it queued and it sends them to the new endpoint.
  Send(new WorkerProcessHostMsg_QueueMessages(message_port_id_));

  // The process could potentially go away while we're still waiting for
  // in-flight messages.  Ensure it stays alive.
  ChildProcess::current()->AddRefProcess();
}

void WebMessagePortChannelImpl::Send(IPC::Message* message) {
  if (MessageLoop::current() != ChildThread::current()->message_loop()) {
    DCHECK(!message->is_sync());
    ChildThread::current()->message_loop()->PostTask(FROM_HERE,
        NewRunnableMethod(this, &WebMessagePortChannelImpl::Send, message));
    return;
  }

  ChildThread::current()->Send(message);
}

void WebMessagePortChannelImpl::OnMessageReceived(const IPC::Message& message) {
  IPC_BEGIN_MESSAGE_MAP(WebMessagePortChannelImpl, message)
    IPC_MESSAGE_HANDLER(WorkerProcessMsg_Message, OnMessage)
    IPC_MESSAGE_HANDLER(WorkerProcessMsg_MessagesQueued, OnMessagedQueued)
  IPC_END_MESSAGE_MAP()
}

void WebMessagePortChannelImpl::OnMessage(
    const string16& message,
    const std::vector<int>& sent_message_port_ids,
    const std::vector<int>& new_routing_ids) {
  AutoLock auto_lock(lock_);
  Message msg;
  msg.message = message;
  if (!sent_message_port_ids.empty()) {
    msg.ports.resize(sent_message_port_ids.size());
    for (size_t i = 0; i < sent_message_port_ids.size(); ++i) {
      msg.ports[i] = new WebMessagePortChannelImpl(
          new_routing_ids[i], sent_message_port_ids[i]);
    }
  }

  bool was_empty = message_queue_.empty();
  message_queue_.push(msg);
  if (client_ && was_empty)
    client_->messageAvailable();
}

void WebMessagePortChannelImpl::OnMessagedQueued() {
  std::vector<QueuedMessage> queued_messages;

  {
    AutoLock auto_lock(lock_);
    queued_messages.reserve(message_queue_.size());
    while (!message_queue_.empty()) {
      string16 message = message_queue_.front().message;
      const std::vector<WebMessagePortChannelImpl*>& channel_array =
          message_queue_.front().ports;
      std::vector<int> port_ids(channel_array.size());
      for (size_t i = 0; i < channel_array.size(); ++i) {
        port_ids[i] = channel_array[i]->message_port_id();
      }
      queued_messages.push_back(std::make_pair(message, port_ids));
      message_queue_.pop();
    }
  }

  Send(new WorkerProcessHostMsg_SendQueuedMessages(
      message_port_id_, queued_messages));

  message_port_id_ = MSG_ROUTING_NONE;

  Release();
  ChildProcess::current()->ReleaseProcess();
}

Generated by  Doxygen 1.6.0   Back to index