Bug 805478 - NetdClient and VolumeManager inherit MessageLoopForIO::LineWatcher. r=dhylands
authorSteven Lee <slee@mozilla.com>
Thu, 08 Nov 2012 14:35:02 -0500
changeset 120665 9f483818db4400794e00a34cbe9ad406bd74dd17
parent 120664 2795b0a82775a150142336e09305f780cf4f9d73
child 120666 006e5f86f6a8f5a1ee5e68002870abd289816464
push id1997
push userakeybl@mozilla.com
push dateMon, 07 Jan 2013 21:25:26 +0000
treeherdermozilla-beta@4baf45cdcf21 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersdhylands
bugs805478
milestone19.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 805478 - NetdClient and VolumeManager inherit MessageLoopForIO::LineWatcher. r=dhylands
dom/system/gonk/VolumeManager.cpp
dom/system/gonk/VolumeManager.h
ipc/netd/Netd.cpp
ipc/netd/Netd.h
--- a/dom/system/gonk/VolumeManager.cpp
+++ b/dom/system/gonk/VolumeManager.cpp
@@ -27,19 +27,19 @@ namespace system {
 static StaticRefPtr<VolumeManager> sVolumeManager;
 
 VolumeManager::STATE VolumeManager::mState = VolumeManager::UNINITIALIZED;
 VolumeManager::StateObserverList VolumeManager::mStateObserverList;
 
 /***************************************************************************/
 
 VolumeManager::VolumeManager()
-  : mSocket(-1),
-    mCommandPending(false),
-    mRcvIdx(0)
+  : LineWatcher('\0', kRcvBufSize),
+    mSocket(-1),
+    mCommandPending(false)
 {
   DBG("VolumeManager constructor called");
 }
 
 VolumeManager::~VolumeManager()
 {
 }
 
@@ -267,85 +267,47 @@ VolumeManager::WriteCommandData()
                           &mWriteWatcher,
                           this)) {
     ERR("Failed to setup write watcher for vold socket");
     Restart();
   }
 }
 
 void
-VolumeManager::OnFileCanReadWithoutBlocking(int aFd)
+VolumeManager::OnLineRead(int aFd, nsDependentCSubstring& aMessage)
 {
   MOZ_ASSERT(aFd == mSocket.get());
-  while (true) {
-    ssize_t bytesRemaining = read(aFd, &mRcvBuf[mRcvIdx], sizeof(mRcvBuf) - mRcvIdx);
-    if (bytesRemaining < 0) {
-      if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
-        return;
-      }
-      if (errno == EINTR) {
-        continue;
-      }
-      ERR("Unknown read error: %d (%s) - restarting", errno, strerror(errno));
-      Restart();
-      return;
-    }
-    if (bytesRemaining == 0) {
-      // This means that vold probably crashed
-      ERR("Vold appears to have crashed - restarting");
-      Restart();
-      return;
-    }
-    // We got some data. Each line is terminated by a null character
-    DBG("Read %ld bytes", bytesRemaining);
-    while (bytesRemaining > 0) {
-      bytesRemaining--;
-      if (mRcvBuf[mRcvIdx] == '\0') {
-        // We found a line terminator. Each line is formatted as an
-        // integer response code followed by the rest of the line.
-        // Fish out the response code.
-        char *endPtr;
-        int responseCode = strtol(mRcvBuf, &endPtr, 10);
-        if (*endPtr == ' ') {
-          endPtr++;
-        }
+  char *endPtr;
+  int responseCode = strtol(aMessage.Data(), &endPtr, 10);
+  if (*endPtr == ' ') {
+    endPtr++;
+  }
+
+  // Now fish out the rest of the line after the response code
+  nsDependentCString  responseLine(endPtr, aMessage.Length() - (endPtr - aMessage.Data()));
+  DBG("Rcvd: %d '%s'", responseCode, responseLine.Data());
 
-        // Now fish out the rest of the line after the response code
-        nsDependentCString  responseLine(endPtr, &mRcvBuf[mRcvIdx] - endPtr);
-        DBG("Rcvd: %d '%s'", responseCode, responseLine.Data());
-
-        if (responseCode >= ResponseCode::UnsolicitedInformational) {
-          // These are unsolicited broadcasts. We intercept these and process
-          // them ourselves
-          HandleBroadcast(responseCode, responseLine);
-        } else {
-          // Everything else is considered to be part of the command response.
-          if (mCommands.size() > 0) {
-            VolumeCommand *cmd = mCommands.front();
-            cmd->HandleResponse(responseCode, responseLine);
-            if (responseCode >= ResponseCode::CommandOkay) {
-              // That's a terminating response. We can remove the command.
-              mCommands.pop();
-              mCommandPending = false;
-              // Start the next command, if there is one.
-              WriteCommandData();
-            }
-          } else {
-            ERR("Response with no command");
-          }
-        }
-        if (bytesRemaining > 0) {
-          // There is data in the receive buffer beyond the current line.
-          // Shift it down to the beginning.
-          memmove(&mRcvBuf[0], &mRcvBuf[mRcvIdx + 1], bytesRemaining);
-        }
-        mRcvIdx = 0;
-      } else {
-        mRcvIdx++;
+  if (responseCode >= ResponseCode::UnsolicitedInformational) {
+    // These are unsolicited broadcasts. We intercept these and process
+    // them ourselves
+    HandleBroadcast(responseCode, responseLine);
+  } else {
+    // Everything else is considered to be part of the command response.
+    if (mCommands.size() > 0) {
+      VolumeCommand *cmd = mCommands.front();
+      cmd->HandleResponse(responseCode, responseLine);
+      if (responseCode >= ResponseCode::CommandOkay) {
+        // That's a terminating response. We can remove the command.
+        mCommands.pop();
+        mCommandPending = false;
+        // Start the next command, if there is one.
+        WriteCommandData();
       }
+    } else {
+      ERR("Response with no command");
     }
   }
 }
 
 void
 VolumeManager::OnFileCanWriteWithoutBlocking(int aFd)
 {
   MOZ_ASSERT(aFd == mSocket.get());
@@ -377,17 +339,16 @@ VolumeManager::Restart()
   mReadWatcher.StopWatchingFileDescriptor();
   mWriteWatcher.StopWatchingFileDescriptor();
 
   while (!mCommands.empty()) {
     mCommands.pop();
   }
   mCommandPending = false;
   mSocket.dispose();
-  mRcvIdx = 0;
   Start();
 }
 
 //static
 void
 VolumeManager::Start()
 {
   MOZ_ASSERT(MessageLoop::current() == XRE_GetIOMessageLoop());
@@ -400,16 +361,22 @@ VolumeManager::Start()
     // Socket open failed, try again in a second.
     MessageLoopForIO::current()->
       PostDelayedTask(FROM_HERE,
                       NewRunnableFunction(VolumeManager::Start),
                       1000);
   }
 }
 
+void
+VolumeManager::OnError()
+{
+  Restart();
+}
+
 /***************************************************************************/
 
 static void
 InitVolumeManagerIOThread()
 {
   MOZ_ASSERT(MessageLoop::current() == XRE_GetIOMessageLoop());
   MOZ_ASSERT(!sVolumeManager);
 
--- a/dom/system/gonk/VolumeManager.h
+++ b/dom/system/gonk/VolumeManager.h
@@ -68,17 +68,17 @@ namespace system {
 *   There is also a command line tool called vdc, which can be used to send
 *   the above commands to vold.
 *
 *   Currently, only the volume list, share/unshare, and mount/unmount
 *   commands are being used.
 *
 ***************************************************************************/
 
-class VolumeManager : public MessageLoopForIO::Watcher,
+class VolumeManager : public MessageLoopForIO::LineWatcher,
                       public RefCounted<VolumeManager>
 {
 public:
 
   typedef nsTArray<RefPtr<Volume> > VolumeArray;
 
   VolumeManager();
   virtual ~VolumeManager();
@@ -126,18 +126,19 @@ public:
   static TemporaryRef<Volume> GetVolume(VolumeArray::index_type aIndex);
   static TemporaryRef<Volume> FindVolumeByName(const nsCSubstring &aName);
   static TemporaryRef<Volume> FindAddVolumeByName(const nsCSubstring &aName);
 
   static void       PostCommand(VolumeCommand *aCommand);
 
 protected:
 
-  virtual void OnFileCanReadWithoutBlocking(int aFd);
+  virtual void OnLineRead(int aFd, nsDependentCSubstring& aMessage);
   virtual void OnFileCanWriteWithoutBlocking(int aFd);
+  virtual void OnError();
 
 private:
   bool OpenSocket();
 
   friend class VolumeListCallback; // Calls SetState
 
   static void SetState(STATE aNewState);
 
@@ -150,18 +151,16 @@ private:
   static STATE              mState;
   static StateObserverList  mStateObserverList;
 
   static const int    kRcvBufSize = 1024;
   ScopedClose         mSocket;
   VolumeArray         mVolumeArray;
   CommandQueue        mCommands;
   bool                mCommandPending;
-  char                mRcvBuf[kRcvBufSize];
-  size_t              mRcvIdx;
   MessageLoopForIO::FileDescriptorWatcher mReadWatcher;
   MessageLoopForIO::FileDescriptorWatcher mWriteWatcher;
   RefPtr<VolumeResponseCallback>          mBroadcastCallback;
 };
 
 /***************************************************************************
 *
 *   The initialization/shutdown functions do not need to be called from
--- a/ipc/netd/Netd.cpp
+++ b/ipc/netd/Netd.cpp
@@ -79,20 +79,20 @@ InitRndisAddress()
 }
 
 } // anonymous namespace
 
 namespace mozilla {
 namespace ipc {
 
 NetdClient::NetdClient()
-  : mSocket(INVALID_SOCKET)
+  : LineWatcher('\0', MAX_COMMAND_SIZE)
+  , mSocket(INVALID_SOCKET)
   , mIOLoop(MessageLoopForIO::current())
   , mCurrentWriteOffset(0)
-  , mReceivedIndex(0)
   , mReConnectTimes(0)
 {
   MOZ_COUNT_CTOR(NetdClient);
 }
 
 NetdClient::~NetdClient()
 {
   MOZ_COUNT_DTOR(NetdClient);
@@ -142,93 +142,54 @@ NetdClient::OpenSocket()
                           &mWriteWatcher,
                           this);
   }
 
   LOG("Connected to netd");
   return true;
 }
 
-void
-NetdClient::OnFileCanReadWithoutBlocking(int aFd)
+void NetdClient::OnLineRead(int aFd, nsDependentCSubstring& aMessage)
 {
-  ssize_t length = 0;
-
-  MOZ_ASSERT(aFd == mSocket.get());
-  while (true) {
-    errno = 0;
-    MOZ_ASSERT(mReceivedIndex < MAX_COMMAND_SIZE);
-    length = read(aFd, &mReceiveBuffer[mReceivedIndex], MAX_COMMAND_SIZE - mReceivedIndex);
-    MOZ_ASSERT(length <= ssize_t(MAX_COMMAND_SIZE - mReceivedIndex));
-    if (length <= 0) {
-      if (length == -1) {
-        if (errno == EINTR) {
-          continue; // retry system call when interrupted
-        }
-        if (errno == EAGAIN || errno == EWOULDBLOCK) {
-          return; // no data available: return and re-poll
-        }
-      }
-      LOG("Can't read from netd error: %d (%s) length: %d", errno, strerror(errno), length);
-      // At this point, assume that we can't actually access
-      // the socket anymore, and start a reconnect loop.
-      Restart();
-      return;
-    }
+  // Set errno to 0 first. For preventing to use the stale version of errno.
+  errno = 0;
+  // We found a line terminator. Each line is formatted as an
+  // integer response code followed by the rest of the line.
+  // Fish out the response code.
+  int responseCode = strtol(aMessage.Data(), nullptr, 10);
+  // TODO, Bug 783966, handle InterfaceChange(600) and BandwidthControl(601).
+  if (!errno && responseCode < 600) {
+    NetdCommand* response = new NetdCommand();
+    // Passing all the response message, including the line terminator.
+    response->mSize = aMessage.Length();
+    memcpy(response->mData, aMessage.Data(), aMessage.Length());
+    gNetdConsumer->MessageReceived(response);
+  }
 
-    while (length-- > 0) {
-      MOZ_ASSERT(mReceivedIndex < MAX_COMMAND_SIZE);
-      if (mReceiveBuffer[mReceivedIndex] == '\0') {
-        // We found a line terminator. Each line is formatted as an
-        // integer response code followed by the rest of the line.
-        // Fish out the response code.
-        errno = 0;
-        int responseCode = strtol(mReceiveBuffer, nullptr, 10);
-        // TODO, Bug 783966, handle InterfaceChange(600) and BandwidthControl(601).
-        if (!errno && responseCode < 600) {
-          NetdCommand* response = new NetdCommand();
-          // Passing all the response message, including the line terminator.
-          response->mSize = mReceivedIndex + 1;
-          memcpy(response->mData, mReceiveBuffer, mReceivedIndex + 1);
-          gNetdConsumer->MessageReceived(response);
-        }
-        if (!responseCode || errno) {
-          LOG("Can't parse netd's response: %d (%s)", errno, strerror(errno));
-        }
-        // There is data in the receive buffer beyond the current line.
-        // Shift it down to the beginning.
-        if (length > 0) {
-          MOZ_ASSERT(mReceivedIndex < (MAX_COMMAND_SIZE - 1));
-          memmove(&mReceiveBuffer[0], &mReceiveBuffer[mReceivedIndex + 1], length);
-        }
-        mReceivedIndex = 0;
-      } else {
-        mReceivedIndex++;
-      }
-    }
+  if (!responseCode) {
+    LOG("Can't parse netd's response");
   }
 }
 
 void
 NetdClient::OnFileCanWriteWithoutBlocking(int aFd)
 {
   MOZ_ASSERT(aFd == mSocket.get());
   WriteNetdCommand();
 }
 
 void
-NetdClient::Restart()
+NetdClient::OnError()
 {
   MOZ_ASSERT(MessageLoop::current() == XRE_GetIOMessageLoop());
 
   mReadWatcher.StopWatchingFileDescriptor();
   mWriteWatcher.StopWatchingFileDescriptor();
 
   mSocket.dispose();
-  mReceivedIndex = 0;
   mCurrentWriteOffset = 0;
   mCurrentNetdCommand = nullptr;
   while (!mOutgoingQ.empty()) {
     delete mOutgoingQ.front();
     mOutgoingQ.pop();
   }
   Start();
 }
@@ -294,17 +255,17 @@ NetdClient::WriteNetdCommand()
 
   while (mCurrentWriteOffset < mCurrentNetdCommand->mSize) {
     ssize_t write_amount = mCurrentNetdCommand->mSize - mCurrentWriteOffset;
     ssize_t written = write(mSocket.get(),
                             mCurrentNetdCommand->mData + mCurrentWriteOffset,
                             write_amount);
     if (written < 0) {
       LOG("Cannot write to network, error %d\n", (int) written);
-      Restart();
+      OnError();
       return;
     }
 
     if (written > 0) {
       mCurrentWriteOffset += written;
     }
 
     if (written != write_amount) {
--- a/ipc/netd/Netd.h
+++ b/ipc/netd/Netd.h
@@ -33,43 +33,41 @@ public:
   virtual void MessageReceived(NetdCommand* aMessage) = 0;
 };
 
 class NetdWriteTask : public Task
 {
   virtual void Run();
 };
 
-class NetdClient : public MessageLoopForIO::Watcher,
+class NetdClient : public MessageLoopForIO::LineWatcher,
                    public RefCounted<NetdClient>
 {
 public:
   typedef std::queue<NetdCommand*> NetdCommandQueue;
 
   NetdClient();
   virtual ~NetdClient();
   static void Start();
   static void SendNetdCommandIOThread(NetdCommand* aMessage);
 
 private:
   void WriteNetdCommand();
-  void Restart();
-  virtual void OnFileCanReadWithoutBlocking(int aFd);
+  virtual void OnError();
+  virtual void OnLineRead(int aFd, nsDependentCSubstring& aMessage);
   virtual void OnFileCanWriteWithoutBlocking(int aFd);
   bool OpenSocket();
 
   MessageLoopForIO *mIOLoop;
   MessageLoopForIO::FileDescriptorWatcher mReadWatcher;
   MessageLoopForIO::FileDescriptorWatcher mWriteWatcher;
   ScopedClose mSocket;
   NetdCommandQueue mOutgoingQ;
-  char mReceiveBuffer[MAX_COMMAND_SIZE];
   nsAutoPtr<NetdCommand> mCurrentNetdCommand;
   size_t mCurrentWriteOffset;
-  size_t mReceivedIndex;
   size_t mReConnectTimes;
 };
 
 void StartNetd(NetdConsumer *);
 void StopNetd();
 void SendNetdCommand(NetdCommand *);
 
 } // namespace ipc