Logo Search packages:      
Sourcecode: chromium-browser version File versions  Download package


// Copyright (c) 2009 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/browser/sync/engine/syncer_thread.h"

#include "build/build_config.h"

#if defined(OS_MACOSX)
#include <CoreFoundation/CFNumber.h>
#include <IOKit/IOTypes.h>
#include <IOKit/IOKitLib.h>

#include <algorithm>
#include <map>
#include <queue>

#include "base/third_party/dynamic_annotations/dynamic_annotations.h"
#include "chrome/browser/sync/engine/auth_watcher.h"
#include "chrome/browser/sync/engine/model_safe_worker.h"
#include "chrome/browser/sync/engine/net/server_connection_manager.h"
#include "chrome/browser/sync/engine/syncer.h"
#include "chrome/browser/sync/syncable/directory_manager.h"
#include "chrome/common/chrome_switches.h"
#include "jingle/notifier/listener/notification_constants.h"

using std::priority_queue;
using std::min;
using base::Time;
using base::TimeDelta;
using base::TimeTicks;

namespace browser_sync {

// We use high values here to ensure that failure to receive poll updates from
// the server doesn't result in rapid-fire polling from the client due to low
// local limits.
const int SyncerThread::kDefaultShortPollIntervalSeconds = 3600 * 8;
const int SyncerThread::kDefaultLongPollIntervalSeconds = 3600 * 12;

// TODO(tim): This is used to regulate the short poll (when notifications are
// disabled) based on user idle time.  If it is set to a smaller value than
// the short poll interval, it basically does nothing; for now, this is what
// we want and allows stronger control over the poll rate from the server. We
// should probably re-visit this code later and figure out if user idle time
// is really something we want and make sure it works, if it is.
const int SyncerThread::kDefaultMaxPollIntervalMs = 30 * 60 * 1000;

void SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) {
  AutoLock lock(lock_);
  if (vault_.syncer_ == NULL) {

  NudgeSyncImpl(milliseconds_from_now, source);

SyncerThread::SyncerThread(sessions::SyncSessionContext* context,
    AllStatus* all_status)
    : thread_main_started_(false, false),
      disable_idle_detection_(false) {
  syncer_event_relay_channel_.reset(new SyncerEventChannel());

  if (context->directory_manager()) {
        context->directory_manager()->channel(), this,

  if (context->connection_manager())


SyncerThread::~SyncerThread() {
  delete vault_.syncer_;

// Creates and starts a syncer thread.
// Returns true if it creates a thread or if there's currently a thread running
// and false otherwise.
bool SyncerThread::Start() {
    AutoLock lock(lock_);
    if (thread_.IsRunning()) {
      return true;

    if (!thread_.Start()) {
      return false;

  thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this,

  // Wait for notification that our task makes it safely onto the message
  // loop before returning, so the caller can't call Stop before we're
  // actually up and running.  This is for consistency with the old pthread
  // impl because pthread_create would do this in one step.
  LOG(INFO) << "SyncerThread started.";
  return true;

// Stop processing. A max wait of at least 2*server RTT time is recommended.
// Returns true if we stopped, false otherwise.
bool SyncerThread::Stop(int max_wait) {

  // This will join, and finish when ThreadMain terminates.
  return true;

void SyncerThread::RequestSyncerExitAndSetThreadStopConditions() {
    AutoLock lock(lock_);
    // If the thread has been started, then we either already have or are about
    // to enter ThreadMainLoop so we have to proceed with shutdown and wait for
    // it to finish.  If the thread has not been started --and we now own the
    // lock-- then we can early out because the caller has not called Start().
    if (!thread_.IsRunning())

    LOG(INFO) << "SyncerThread::Stop - setting ThreadMain exit condition to "
              << "true (vault_.stop_syncer_thread_)";
    // Exit the ThreadMainLoop once the syncer finishes (we tell it to exit
    // below).
    vault_.stop_syncer_thread_ = true;
    if (NULL != vault_.syncer_) {
      // Try to early exit the syncer itself, which could be looping inside
      // SyncShare.

    // stop_syncer_thread_ is now true and the Syncer has been told to exit.
    // We want to wake up all waiters so they can re-examine state. We signal,
    // causing all waiters to try to re-acquire the lock, and then we release
    // the lock, and join on our internal thread which should soon run off the
    // end of ThreadMain.

bool SyncerThread::RequestPause() {
  AutoLock lock(lock_);
  if (vault_.pause_requested_ || vault_.paused_)
    return false;

  if (thread_.IsRunning()) {
    // Set the pause request.  The syncer thread will read this
    // request, enter the paused state, and send the PAUSED
    // notification.
    vault_.pause_requested_ = true;
    LOG(INFO) << "Pause requested.";
  } else {
    // If the thread is not running, go directly into the paused state
    // and notify.
    LOG(INFO) << "Paused while not running.";
  return true;

bool SyncerThread::RequestResume() {
  AutoLock lock(lock_);
  // Only valid to request a resume when we are already paused or we
  // have a pause pending.
  if (!(vault_.paused_ || vault_.pause_requested_))
    return false;

  if (thread_.IsRunning()) {
    if (vault_.pause_requested_) {
      // If pause was requested we have not yet paused.  In this case,
      // the resume cancels the pause request.
      SyncerEvent event(SyncerEvent::RESUMED);
      LOG(INFO) << "Pending pause canceled by resume.";
    } else {
      // Unpause and notify.
      vault_.paused_ = false;
  } else {
    LOG(INFO) << "Resumed while not running.";
  return true;

void SyncerThread::OnReceivedLongPollIntervalUpdate(
    const base::TimeDelta& new_interval) {
  syncer_long_poll_interval_seconds_ = static_cast<int>(

void SyncerThread::OnReceivedShortPollIntervalUpdate(
    const base::TimeDelta& new_interval) {
  syncer_short_poll_interval_seconds_ = static_cast<int>(

void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) {
  silenced_until_ = silenced_until;

bool SyncerThread::IsSyncingCurrentlySilenced() {
  // We should ignore reads from silenced_until_ under ThreadSanitizer
  // since this is a benign race.
  bool ret = (silenced_until_ - TimeTicks::Now()) >= TimeDelta::FromSeconds(0);
  return ret;

void SyncerThread::OnShouldStopSyncingPermanently() {

  SyncerEvent event(SyncerEvent::STOP_SYNCING_PERMANENTLY);

void SyncerThread::ThreadMainLoop() {
  // This is called with lock_ acquired.
  LOG(INFO) << "In thread main loop.";

  // Use the short poll value by default.
  vault_.current_wait_interval_.poll_delta =
  int user_idle_milliseconds = 0;
  TimeTicks last_sync_time;
  bool initial_sync_for_thread = true;
  bool continue_sync_cycle = false;

#if defined(OS_LINUX)
  idle_query_.reset(new IdleQueryLinux());

  if (vault_.syncer_ == NULL) {
    LOG(INFO) << "Syncer thread waiting for database initialization.";
    while (vault_.syncer_ == NULL && !vault_.stop_syncer_thread_)
    LOG_IF(INFO, !(vault_.syncer_ == NULL))
        << "Syncer was found after DB started.";

  while (!vault_.stop_syncer_thread_) {
    // The Wait()s in these conditionals using |vault_| are not TimedWait()s (as
    // below) because we cannot poll until these conditions are met, so we wait
    // indefinitely.

    // If we are not connected, enter WaitUntilConnectedOrQuit() which
    // will return only when the network is connected or a quit is
    // requested.  Note that it is possible to exit
    // WaitUntilConnectedOrQuit() in the paused state which will be
    // handled by the next statement.
    if (!vault_.connected_ && !initial_sync_for_thread) {

    // Check if we should be paused or if a pause was requested.  Note
    // that we don't check initial_sync_for_thread here since we want
    // the pause to happen regardless if it is the initial sync or not.
    if (vault_.pause_requested_ || vault_.paused_) {

    const TimeTicks next_poll = last_sync_time +
    bool throttled = vault_.current_wait_interval_.mode ==
    // If we are throttled, we must wait.  Otherwise, wait until either the next
    // nudge (if one exists) or the poll interval.
    TimeTicks end_wait = next_poll;
    if (!throttled && !vault_.pending_nudge_time_.is_null()) {
      end_wait = std::min(end_wait, vault_.pending_nudge_time_);
    LOG(INFO) << "end_wait is " << end_wait.ToInternalValue();
    LOG(INFO) << "next_poll is " << next_poll.ToInternalValue();

    // We block until the CV is signaled (e.g a control field changed, loss of
    // network connection, nudge, spurious, etc), or the poll interval elapses.
    TimeDelta sleep_time = end_wait - TimeTicks::Now();
    if (!initial_sync_for_thread && sleep_time > TimeDelta::FromSeconds(0)) {

      if (TimeTicks::Now() < end_wait) {
        // Didn't timeout. Could be a spurious signal, or a signal corresponding
        // to an actual change in one of our control fields.  By continuing here
        // we perform the typical "always recheck conditions when signaled",
        // (typically handled by a while(condition_not_met) cv.wait() construct)
        // because we jump to the top of the loop.  The main difference is we
        // recalculate the wait interval, but last_sync_time won't have changed.
        // So if we were signaled by a nudge (for ex.) we'll grab the new nudge
        // off the queue and wait for that delta.  If it was a spurious signal,
        // we'll keep waiting for the same moment in time as we just were.

    // Handle a nudge, caused by either a notification or a local bookmark
    // event.  This will also update the source of the following SyncMain call.
    bool nudged = UpdateNudgeSource(throttled, continue_sync_cycle,

    LOG(INFO) << "Calling Sync Main at time " << Time::Now().ToInternalValue();
    last_sync_time = TimeTicks::Now();

    LOG(INFO) << "Updating the next polling time after SyncMain";
    vault_.current_wait_interval_ = CalculatePollingWaitTime(
        &user_idle_milliseconds, &continue_sync_cycle, nudged);
#if defined(OS_LINUX)

void SyncerThread::WaitUntilConnectedOrQuit() {
  LOG(INFO) << "Syncer thread waiting for connection.";
  SyncerEvent event(SyncerEvent::WAITING_FOR_CONNECTION);

  bool is_paused = vault_.paused_;

  while (!vault_.connected_ && !vault_.stop_syncer_thread_) {
    if (!is_paused && vault_.pause_requested_) {
      // If we get a pause request while waiting for a connection,
      // enter the paused state.
      is_paused = true;
      LOG(INFO) << "Syncer thread entering disconnected pause.";

    if (is_paused && !vault_.paused_) {
      is_paused = false;
      LOG(INFO) << "Syncer thread exiting disconnected pause.";


  if (!vault_.stop_syncer_thread_) {
    SyncerEvent event(SyncerEvent::CONNECTED);
    LOG(INFO) << "Syncer thread found connection.";

void SyncerThread::PauseUntilResumedOrQuit() {
  LOG(INFO) << "Syncer thread entering pause.";
  // If pause was requested (rather than already being paused), send
  // the PAUSED notification.
  if (vault_.pause_requested_)

  // Thread will get stuck here until either a resume is requested
  // or shutdown is started.
  while (vault_.paused_ && !vault_.stop_syncer_thread_)

  // Notify that we have resumed if we are not shutting down.
  if (!vault_.stop_syncer_thread_)

  LOG(INFO) << "Syncer thread exiting pause.";

void SyncerThread::EnterPausedState() {
  vault_.pause_requested_ = false;
  vault_.paused_ = true;
  SyncerEvent event(SyncerEvent::PAUSED);

void SyncerThread::ExitPausedState() {
  vault_.paused_ = false;
  SyncerEvent event(SyncerEvent::RESUMED);

// We check how long the user's been idle and sync less often if the machine is
// not in use. The aim is to reduce server load.
// TODO(timsteele): Should use Time(Delta).
SyncerThread::WaitInterval SyncerThread::CalculatePollingWaitTime(
    const AllStatus::Status& status,
    int last_poll_wait,  // Time in seconds.
    int* user_idle_milliseconds,
    bool* continue_sync_cycle,
    bool was_nudged) {
  lock_.AssertAcquired();  // We access 'vault' in here, so we need the lock.
  WaitInterval return_interval;

  // Server initiated throttling trumps everything.
  if (!silenced_until_.is_null()) {
    // We don't need to reset other state, it can continue where it left off.
    return_interval.mode = WaitInterval::THROTTLED;
    return_interval.poll_delta = silenced_until_ - TimeTicks::Now();
    return return_interval;

  bool is_continuing_sync_cyle = *continue_sync_cycle;
  *continue_sync_cycle = false;

  // Determine if the syncer has unfinished work to do from allstatus_.
  const bool syncer_has_work_to_do =
    status.updates_available > status.updates_received
    || status.unsynced_count > 0;
  LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do;

  // First calculate the expected wait time, figuring in any backoff because of
  // user idle time.  next_wait is in seconds
  syncer_polling_interval_ = (!status.notifications_enabled) ?
      syncer_short_poll_interval_seconds_ :
  int default_next_wait = syncer_polling_interval_;
  return_interval.poll_delta = TimeDelta::FromSeconds(default_next_wait);

  if (syncer_has_work_to_do) {
    // Provide exponential backoff due to consecutive errors, else attempt to
    // complete the work as soon as possible.
    if (is_continuing_sync_cyle) {
      return_interval.mode = WaitInterval::EXPONENTIAL_BACKOFF;
      if (was_nudged && vault_.current_wait_interval_.mode ==
          WaitInterval::EXPONENTIAL_BACKOFF) {
          // We were nudged, it failed, and we were already in backoff.
          return_interval.had_nudge_during_backoff = true;
          // Keep exponent for exponential backoff the same in this case.
          return_interval.poll_delta = vault_.current_wait_interval_.poll_delta;
      } else {
        // We weren't nudged, or we were in a NORMAL wait interval until now.
        return_interval.poll_delta = TimeDelta::FromSeconds(
    } else {
      // No consecutive error.
      return_interval.poll_delta = TimeDelta::FromSeconds(
    *continue_sync_cycle = true;
  } else if (!status.notifications_enabled) {
    // Ensure that we start exponential backoff from our base polling
    // interval when we are not continuing a sync cycle.
    last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_);

    // Did the user start interacting with the computer again?
    // If so, revise our idle time (and probably next_sync_time) downwards
    int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime();
    if (new_idle_time < *user_idle_milliseconds) {
      *user_idle_milliseconds = new_idle_time;
    return_interval.poll_delta = TimeDelta::FromMilliseconds(
        CalculateSyncWaitTime(last_poll_wait * 1000,
    DCHECK_GE(return_interval.poll_delta.InSeconds(), default_next_wait);

  LOG(INFO) << "Sync wait: idle " << default_next_wait
            << " non-idle or backoff "
            << return_interval.poll_delta.InSeconds() << ".";

  return return_interval;

void SyncerThread::ThreadMain() {
  AutoLock lock(lock_);
  // Signal Start() to let it know we've made it safely onto the message loop,
  // and unblock it's caller.
  LOG(INFO) << "Syncer thread ThreadMain is done.";
  SyncerEvent event(SyncerEvent::SYNCER_THREAD_EXITING);

void SyncerThread::SyncMain(Syncer* syncer) {

  // Since we are initiating a new session for which we are the delegate, we
  // are not currently silenced so reset this state for the next session which
  // may need to use it.
  silenced_until_ = base::TimeTicks();

  AutoUnlock unlock(lock_);
  while (syncer->SyncShare(this) && silenced_until_.is_null()) {
    LOG(INFO) << "Looping in sync share";
  LOG(INFO) << "Done looping in sync share";

bool SyncerThread::UpdateNudgeSource(bool was_throttled,
                                     bool continue_sync_cycle,
                                     bool* initial_sync) {
  bool nudged = false;
  NudgeSource nudge_source = kUnknown;
  // Has the previous sync cycle completed?
  if (continue_sync_cycle) {
    nudge_source = kContinuation;
  // Update the nudge source if a new nudge has come through during the
  // previous sync cycle.
  if (!vault_.pending_nudge_time_.is_null()) {
    if (!was_throttled && !nudged) {
      nudge_source = vault_.pending_nudge_source_;
      nudged = true;
    LOG(INFO) << "Clearing pending nudge from "
              << vault_.pending_nudge_source_
              << " at tick "
              << vault_.pending_nudge_time_.ToInternalValue();
    vault_.pending_nudge_source_ = kUnknown;
    vault_.pending_nudge_time_ = base::TimeTicks();
  SetUpdatesSource(nudged, nudge_source, initial_sync);
  return nudged;

void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source,
                                    bool* initial_sync) {
  sync_pb::GetUpdatesCallerInfo::GetUpdatesSource updates_source =
  if (*initial_sync) {
    updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE;
    *initial_sync = false;
  } else if (!nudged) {
    updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC;
  } else {
    switch (nudge_source) {
      case kNotification:
        updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION;
      case kLocal:
        updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL;
      case kContinuation:
        updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
      case kUnknown:
        updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN;

void SyncerThread::HandleChannelEvent(const SyncerEvent& event) {
  AutoLock lock(lock_);
  if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) {
  NudgeSyncImpl(event.nudge_delay_milliseconds, kUnknown);

void SyncerThread::HandleDirectoryManagerEvent(
    const syncable::DirectoryManagerEvent& event) {
  LOG(INFO) << "Handling a directory manager event";
  if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) {
    AutoLock lock(lock_);
    LOG(INFO) << "Syncer starting up for: " << event.dirname;
    // The underlying database structure is ready, and we should create
    // the syncer.
    CHECK(vault_.syncer_ == NULL);
    vault_.syncer_ = new Syncer(session_context_.get());


// Sets |*connected| to false if it is currently true but |code| suggests that
// the current network configuration and/or auth state cannot be used to make
// forward progress, and user intervention (e.g changing server URL or auth
// credentials) is likely necessary.  If |*connected| is false, set it to true
// if |code| suggests that we just recently made healthy contact with the
// server.
static inline void CheckConnected(bool* connected,
                                  HttpResponse::ServerConnectionCode code,
                                  ConditionVariable* condvar) {
  if (*connected) {
    // Note, be careful when adding cases here because if the SyncerThread
    // thinks there is no valid connection as determined by this method, it
    // will drop out of *all* forward progress sync loops (it won't poll and it
    // will queue up Talk notifications but not actually call SyncShare) until
    // some external action causes a ServerConnectionManager to broadcast that
    // a valid connection has been re-established.
    if (HttpResponse::CONNECTION_UNAVAILABLE == code ||
        HttpResponse::SYNC_AUTH_ERROR == code) {
      *connected = false;
  } else {
    if (HttpResponse::SERVER_CONNECTION_OK == code) {
      *connected = true;

void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) {
  conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this,
  CheckConnected(&vault_.connected_, conn_mgr->server_status(),

void SyncerThread::HandleServerConnectionEvent(
    const ServerConnectionEvent& event) {
  if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) {
    AutoLock lock(lock_);
    CheckConnected(&vault_.connected_, event.connection_code,

SyncerEventChannel* SyncerThread::relay_channel() {
  return syncer_event_relay_channel_.get();

// Inputs and return value in milliseconds.
int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) {
  // syncer_polling_interval_ is in seconds
  int syncer_polling_interval_ms = syncer_polling_interval_ * 1000;

  // This is our default and lower bound.
  int next_wait = syncer_polling_interval_ms;

  // Get idle time, bounded by max wait.
  int idle = min(user_idle_ms, syncer_max_interval_);

  // If the user has been idle for a while, we'll start decreasing the poll
  // rate.
  if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) {
    next_wait = std::min(AllStatus::GetRecommendedDelaySeconds(
        last_interval / 1000), syncer_max_interval_ / 1000) * 1000;

  return next_wait;

// Called with mutex_ already locked.
void SyncerThread::NudgeSyncImpl(int milliseconds_from_now,
                                 NudgeSource source) {
  // TODO(sync): Add the option to reset the backoff state machine.
  // This is needed so nudges that are a result of the user's desire
  // to download updates for a new data type can be satisfied quickly.
  if (vault_.current_wait_interval_.mode == WaitInterval::THROTTLED ||
      vault_.current_wait_interval_.had_nudge_during_backoff) {
    // Drop nudges on the floor if we've already had one since starting this
    // stage of exponential backoff or we are throttled.

  const TimeTicks nudge_time = TimeTicks::Now() +
  if (nudge_time <= vault_.pending_nudge_time_) {
    LOG(INFO) << "Nudge for source " << source
              << " dropped due to existing later pending nudge";

  LOG(INFO) << "Replacing pending nudge for source "
            << source << " at " << nudge_time.ToInternalValue();
  vault_.pending_nudge_source_ = source;
  vault_.pending_nudge_time_ = nudge_time;

void SyncerThread::SetNotificationsEnabled(bool notifications_enabled) {
  AutoLock lock(lock_);

// Returns the amount of time since the user last interacted with the computer,
// in milliseconds
int SyncerThread::UserIdleTime() {
#if defined(OS_WIN)
  LASTINPUTINFO last_input_info;
  last_input_info.cbSize = sizeof(LASTINPUTINFO);

  // Get time in windows ticks since system start of last activity.
  BOOL b = ::GetLastInputInfo(&last_input_info);
  if (b == TRUE)
    return ::GetTickCount() - last_input_info.dwTime;
#elif defined(OS_MACOSX)
  // It would be great to do something like:
  // return 1000 *
  //     CGEventSourceSecondsSinceLastEventType(
  //         kCGEventSourceStateCombinedSessionState,
  //         kCGAnyInputEventType);
  // Unfortunately, CGEvent* lives in ApplicationServices, and we're a daemon
  // and can't link that high up the food chain. Thus this mucking in IOKit.

  io_service_t hid_service =
  if (!hid_service) {
    LOG(WARNING) << "Could not obtain IOHIDSystem";
    return 0;

  CFTypeRef object = IORegistryEntryCreateCFProperty(hid_service,
  if (!object) {
    LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property";
    return 0;

  int64 idle_time;  // in nanoseconds
  Boolean success = false;
  if (CFGetTypeID(object) == CFNumberGetTypeID()) {
    success = CFNumberGetValue((CFNumberRef)object,
  } else {
    LOG(WARNING) << "IOHIDSystem's HIDIdleTime property isn't a number!";


  if (!success) {
    LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property's value";
    return 0;
  } else {
    return idle_time / 1000000;  // nano to milli
#elif defined(OS_LINUX)
  if (idle_query_.get()) {
    return idle_query_->IdleTime();
  } else {
    return 0;
  static bool was_logged = false;
  if (!was_logged) {
    was_logged = true;
    LOG(INFO) << "UserIdleTime unimplemented on this platform, "
        "synchronization will not throttle when user idle";

  return 0;

}  // namespace browser_sync

Generated by  Doxygen 1.6.0   Back to index