Bug 826931 - Part 3/3: use mozilla::ipc::UnixSocket. r=qDot
authorVicamo Yang <vyang@mozilla.com>
Fri, 01 Feb 2013 20:28:22 +0800
changeset 130456 8cf5d79350603f6b22ca05f556a1262df542a57d
parent 130455 f702f4adea068537a97eece6383eda6bd9226a5e
child 130457 71021f2374545b00ab23786060e3f7633427bd6b
push id2323
push userbbajaj@mozilla.com
push dateMon, 01 Apr 2013 19:47:02 +0000
treeherdermozilla-beta@7712be144d91 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersqDot
bugs826931
milestone21.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 826931 - Part 3/3: use mozilla::ipc::UnixSocket. r=qDot
dom/system/gonk/SystemWorkerManager.cpp
dom/system/gonk/SystemWorkerManager.h
ipc/ril/Ril.cpp
ipc/ril/Ril.h
--- a/dom/system/gonk/SystemWorkerManager.cpp
+++ b/dom/system/gonk/SystemWorkerManager.cpp
@@ -59,29 +59,46 @@ NS_DEFINE_CID(kNetworkManagerCID, NS_NET
 SystemWorkerManager *gInstance = nullptr;
 
 class ConnectWorkerToRIL : public WorkerTask
 {
 public:
   virtual bool RunTask(JSContext *aCx);
 };
 
+class SendRilSocketDataTask : public nsRunnable
+{
+public:
+  SendRilSocketDataTask(UnixSocketRawData *aRawData)
+    : mRawData(aRawData)
+  { }
+
+  NS_IMETHOD Run()
+  {
+    MOZ_ASSERT(NS_IsMainThread());
+    SystemWorkerManager::SendRilRawData(mRawData);
+    return NS_OK;
+  }
+
+private:
+  UnixSocketRawData *mRawData;
+};
+
 JSBool
 PostToRIL(JSContext *cx, unsigned argc, jsval *vp)
 {
   NS_ASSERTION(!NS_IsMainThread(), "Expecting to be on the worker thread");
 
   if (argc != 1) {
     JS_ReportError(cx, "Expecting a single argument with the RIL message");
     return false;
   }
 
   jsval v = JS_ARGV(cx, vp)[0];
 
-  nsAutoPtr<RilRawData> rm(new RilRawData());
   JSAutoByteString abs;
   void *data;
   size_t size;
   if (JSVAL_IS_STRING(v)) {
     JSString *str = JSVAL_TO_STRING(v);
     if (!abs.encode(cx, str)) {
       return false;
     }
@@ -106,87 +123,37 @@ PostToRIL(JSContext *cx, unsigned argc, 
     size = JS_GetTypedArrayByteLength(obj);
     data = JS_GetArrayBufferViewData(obj);
   } else {
     JS_ReportError(cx,
                    "Incorrect argument. Expecting a string or a typed array");
     return false;
   }
 
-  if (size > RilRawData::MAX_DATA_SIZE) {
-    JS_ReportError(cx, "Passed-in data is too large");
-    return false;
-  }
+  UnixSocketRawData* raw = new UnixSocketRawData(size);
+  memcpy(raw->mData, data, raw->mSize);
 
-  rm->mSize = size;
-  memcpy(rm->mData, data, size);
-
-  RilRawData *tosend = rm.forget();
-  JS_ALWAYS_TRUE(SendRilRawData(&tosend));
+  nsRefPtr<SendRilSocketDataTask> task = new SendRilSocketDataTask(raw);
+  NS_DispatchToMainThread(task);
   return true;
 }
 
 bool
 ConnectWorkerToRIL::RunTask(JSContext *aCx)
 {
   // Set up the postRILMessage on the function for worker -> RIL thread
   // communication.
   NS_ASSERTION(!NS_IsMainThread(), "Expecting to be on the worker thread");
   NS_ASSERTION(!JS_IsRunning(aCx), "Are we being called somehow?");
   JSObject *workerGlobal = JS_GetGlobalObject(aCx);
 
   return !!JS_DefineFunction(aCx, workerGlobal, "postRILMessage", PostToRIL, 1,
                              0);
 }
 
-class RILReceiver : public RilConsumer
-{
-  class DispatchRILEvent : public WorkerTask
-  {
-  public:
-    DispatchRILEvent(RilRawData *aMessage)
-      : mMessage(aMessage)
-    { }
-
-    virtual bool RunTask(JSContext *aCx);
-
-  private:
-    nsAutoPtr<RilRawData> mMessage;
-  };
-
-public:
-  RILReceiver(WorkerCrossThreadDispatcher *aDispatcher)
-    : mDispatcher(aDispatcher)
-  { }
-
-  virtual void MessageReceived(RilRawData *aMessage) {
-    nsRefPtr<DispatchRILEvent> dre(new DispatchRILEvent(aMessage));
-    mDispatcher->PostTask(dre);
-  }
-
-private:
-  nsRefPtr<WorkerCrossThreadDispatcher> mDispatcher;
-};
-
-bool
-RILReceiver::DispatchRILEvent::RunTask(JSContext *aCx)
-{
-  JSObject *obj = JS_GetGlobalObject(aCx);
-
-  JSObject *array = JS_NewUint8Array(aCx, mMessage->mSize);
-  if (!array) {
-    return false;
-  }
-
-  memcpy(JS_GetArrayBufferViewData(array), mMessage->mData, mMessage->mSize);
-  jsval argv[] = { OBJECT_TO_JSVAL(array) };
-  return JS_CallFunctionName(aCx, obj, "onRILMessage", NS_ARRAY_LENGTH(argv),
-                             argv, argv);
-}
-
 #ifdef MOZ_WIDGET_GONK
 
 JSBool
 DoNetdCommand(JSContext *cx, unsigned argc, jsval *vp)
 {
   NS_ASSERTION(!NS_IsMainThread(), "Expecting to be on the worker thread");
 
   if (argc != 1) {
@@ -401,17 +368,20 @@ SystemWorkerManager::Shutdown()
   NS_ASSERTION(NS_IsMainThread(), "Wrong thread!");
 
   mShutdown = true;
 
 #ifdef MOZ_WIDGET_GONK
   ShutdownAutoMounter();
 #endif
 
-  StopRil();
+  if (mRilConsumer) {
+    mRilConsumer->Shutdown();
+    mRilConsumer = nullptr;
+  }
 
 #ifdef MOZ_WIDGET_GONK
   StopNetd();
   mNetdWorker = nullptr;
 #endif
 
   nsCOMPtr<nsIWifi> wifi(do_QueryInterface(mWifiWorker));
   if (wifi) {
@@ -450,16 +420,27 @@ SystemWorkerManager::FactoryCreate()
 
 // static
 nsIInterfaceRequestor*
 SystemWorkerManager::GetInterfaceRequestor()
 {
   return gInstance;
 }
 
+bool
+SystemWorkerManager::SendRilRawData(UnixSocketRawData* aRaw)
+{
+  if (!gInstance->mRilConsumer) {
+    // Probably shuting down.
+    delete aRaw;
+    return true;
+  }
+  return gInstance->mRilConsumer->SendSocketData(aRaw);
+}
+
 NS_IMETHODIMP
 SystemWorkerManager::GetInterface(const nsIID &aIID, void **aResult)
 {
   NS_ASSERTION(NS_IsMainThread(), "Wrong thread!");
 
   if (aIID.Equals(NS_GET_IID(nsIWifi))) {
     return CallQueryInterface(mWifiWorker,
                               reinterpret_cast<nsIWifi**>(aResult));
@@ -492,18 +473,17 @@ SystemWorkerManager::RegisterRilWorker(c
   }
 
   nsRefPtr<ConnectWorkerToRIL> connection = new ConnectWorkerToRIL();
   if (!wctd->PostTask(connection)) {
     return NS_ERROR_UNEXPECTED;
   }
 
   // Now that we're set up, connect ourselves to the RIL thread.
-  mozilla::RefPtr<RILReceiver> receiver = new RILReceiver(wctd);
-  StartRil(receiver);
+  mRilConsumer = new RilConsumer(wctd);
   return NS_OK;
 }
 
 #ifdef MOZ_WIDGET_GONK
 nsresult
 SystemWorkerManager::InitNetd(JSContext *cx)
 {
   nsCOMPtr<nsIWorkerHolder> worker = do_GetService(kNetworkManagerCID);
--- a/dom/system/gonk/SystemWorkerManager.h
+++ b/dom/system/gonk/SystemWorkerManager.h
@@ -26,16 +26,22 @@
 #include "nsDebug.h"
 #include "nsDOMEventTargetHelper.h"
 #include "nsStringGlue.h"
 #include "nsTArray.h"
 
 class nsIWorkerHolder;
 
 namespace mozilla {
+
+namespace ipc {
+  class RilConsumer;
+  class UnixSocketRawData;
+}
+
 namespace dom {
 namespace gonk {
 
 class SystemWorkerManager : public nsIObserver,
                             public nsIInterfaceRequestor,
                             public nsISystemWorkerManager
 {
 public:
@@ -48,30 +54,34 @@ public:
   void Shutdown();
 
   static already_AddRefed<SystemWorkerManager>
   FactoryCreate();
 
   static nsIInterfaceRequestor*
   GetInterfaceRequestor();
 
+  static bool SendRilRawData(ipc::UnixSocketRawData* aRaw);
+
 private:
   SystemWorkerManager();
   ~SystemWorkerManager();
 
 #ifdef MOZ_WIDGET_GONK
   nsresult InitNetd(JSContext *cx);
 #endif
   nsresult InitWifi(JSContext *cx);
 
 #ifdef MOZ_WIDGET_GONK
   nsCOMPtr<nsIWorkerHolder> mNetdWorker;
 #endif
   nsCOMPtr<nsIWorkerHolder> mWifiWorker;
 
+  nsRefPtr<ipc::RilConsumer> mRilConsumer;
+
   bool mShutdown;
 };
 
 }
 }
 }
 
 #endif // mozilla_dom_system_b2g_systemworkermanager_h__
--- a/ipc/ril/Ril.cpp
+++ b/ipc/ril/Ril.cpp
@@ -1,426 +1,212 @@
 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
 /* vim: set sw=4 ts=8 et ft=cpp: */
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #include <fcntl.h>
-#include <unistd.h>
-
-#include <queue>
-
 #include <sys/socket.h>
 #include <sys/un.h>
-#include <sys/select.h>
-#include <sys/types.h>
-
-#include "base/eintr_wrapper.h"
-#include "base/message_loop.h"
-#include "mozilla/FileUtils.h"
-#include "mozilla/Monitor.h"
-#include "mozilla/Util.h"
-#include "nsAutoPtr.h"
-#include "nsIThread.h"
-#include "nsXULAppAPI.h"
-#include "Ril.h"
 
 #undef LOG
 #if defined(MOZ_WIDGET_GONK)
 #include <android/log.h>
 #define LOG(args...)  __android_log_print(ANDROID_LOG_INFO, "Gonk", args)
 #else
 #define LOG(args...)  printf(args);
 #endif
 
-#define RIL_SOCKET_NAME "/dev/socket/rilproxy"
+#include "jsfriendapi.h"
+#include "nsThreadUtils.h" // For NS_IsMainThread.
+#include "Ril.h"
 
-using namespace base;
-using namespace std;
+USING_WORKERS_NAMESPACE
+using namespace mozilla::ipc;
+
+namespace {
+
+const char* RIL_SOCKET_NAME = "/dev/socket/rilproxy";
 
 // Network port to connect to for adb forwarded sockets when doing
 // desktop development.
 const uint32_t RIL_TEST_PORT = 6200;
 
-namespace mozilla {
-namespace ipc {
-
-struct RilClient : public RefCounted<RilClient>,
-                   public MessageLoopForIO::Watcher
-
+class DispatchRILEvent : public WorkerTask
 {
-    typedef queue<RilRawData*> RilRawDataQueue;
-
-    RilClient() : mSocket(-1)
-                , mMutex("RilClient.mMutex")
-                , mBlockedOnWrite(false)
-                , mIOLoop(MessageLoopForIO::current())
-                , mCurrentRilRawData(NULL)
+public:
+    DispatchRILEvent(UnixSocketRawData* aMessage)
+      : mMessage(aMessage)
     { }
-    virtual ~RilClient() { }
+
+    virtual bool RunTask(JSContext *aCx);
+
+private:
+    nsAutoPtr<UnixSocketRawData> mMessage;
+};
 
-    bool OpenSocket();
+bool
+DispatchRILEvent::RunTask(JSContext *aCx)
+{
+    JSObject *obj = JS_GetGlobalObject(aCx);
 
-    virtual void OnFileCanReadWithoutBlocking(int fd);
-    virtual void OnFileCanWriteWithoutBlocking(int fd);
+    JSObject *array = JS_NewUint8Array(aCx, mMessage->mSize);
+    if (!array) {
+        return false;
+    }
 
-    ScopedClose mSocket;
-    MessageLoopForIO::FileDescriptorWatcher mReadWatcher;
-    MessageLoopForIO::FileDescriptorWatcher mWriteWatcher;
-    nsAutoPtr<RilRawData> mIncoming;
-    Mutex mMutex;
-    RilRawDataQueue mOutgoingQ;
-    bool mBlockedOnWrite;
-    MessageLoopForIO* mIOLoop;
-    nsAutoPtr<RilRawData> mCurrentRilRawData;
-    size_t mCurrentWriteOffset;
+    memcpy(JS_GetArrayBufferViewData(array), mMessage->mData, mMessage->mSize);
+    jsval argv[] = { OBJECT_TO_JSVAL(array) };
+    return JS_CallFunctionName(aCx, obj, "onRILMessage", NS_ARRAY_LENGTH(argv),
+                               argv, argv);
+}
+
+class RilConnector : public mozilla::ipc::UnixSocketConnector
+{
+public:
+  virtual ~RilConnector()
+  {}
+
+  virtual int Create();
+  virtual void CreateAddr(bool aIsServer,
+                          socklen_t& aAddrSize,
+                          struct sockaddr *aAddr,
+                          const char* aAddress);
+  virtual bool SetUp(int aFd);
+  virtual void GetSocketAddr(const sockaddr& aAddr,
+                             nsAString& aAddrStr);
 };
 
-static RefPtr<RilClient> sClient;
-static RefPtr<RilConsumer> sConsumer;
-
-//-----------------------------------------------------------------------------
-// This code runs on the IO thread.
-//
+int
+RilConnector::Create()
+{
+    MOZ_ASSERT(!NS_IsMainThread());
 
-class RilReconnectTask : public CancelableTask {
-    RilReconnectTask() : mCanceled(false) { }
-
-    virtual void Run();
-    virtual void Cancel() { mCanceled = true; }
-
-    bool mCanceled;
+    int fd = -1;
 
-public:
-    static void Enqueue(int aDelayMs = 0) {
-        MessageLoopForIO* ioLoop = MessageLoopForIO::current();
-        if (!ioLoop) {
-            NS_WARNING("No IOLoop to attach to, cancelling self!");
-            CancelIt();
-            return;
-        }
-        if (sTask) {
-            return;
-        }
-        sTask = new RilReconnectTask();
-        if (aDelayMs) {
-            ioLoop->PostDelayedTask(FROM_HERE, sTask, aDelayMs);
-        } else {
-            ioLoop->PostTask(FROM_HERE, sTask);
-        }
+#if defined(MOZ_WIDGET_GONK)
+    fd = socket(AF_LOCAL, SOCK_STREAM, 0);
+#else
+    struct hostent *hp;
+
+    hp = gethostbyname("localhost");
+    if (hp) {
+        fd = socket(hp->h_addrtype, SOCK_STREAM, 0);
+    }
+#endif
+
+    if (fd < 0) {
+        NS_WARNING("Could not open ril socket!");
+        return -1;
     }
 
-    static void CancelIt() {
-        if (!sTask) {
-            return;
-        }
-        sTask->Cancel();
-        sTask = nullptr;
+    if (!SetUp(fd)) {
+        NS_WARNING("Could not set up socket!");
     }
+    return fd;
+}
+
+void
+RilConnector::CreateAddr(bool aIsServer,
+                         socklen_t& aAddrSize,
+                         struct sockaddr *aAddr,
+                         const char* aAddress)
+{
+    // We never open ril socket as server.
+    MOZ_ASSERT(!aIsServer);
 
-private:
-    // Can *ONLY* be touched by the IO thread.  The event queue owns
-    // this memory when pointer is nonnull; do *NOT* free it manually.
-    static CancelableTask* sTask;
-};
-CancelableTask* RilReconnectTask::sTask;
+#if defined(MOZ_WIDGET_GONK)
+    struct sockaddr_un addr_un;
+
+    memset(&addr_un, 0, sizeof(addr_un));
+    strcpy(addr_un.sun_path, aAddress);
+    addr_un.sun_family = AF_LOCAL;
 
-void RilReconnectTask::Run() {
-    // NB: the order of these two statements is important!  sTask must
-    // always run, whether we've been canceled or not, to avoid
-    // leading a dangling pointer in sTask.
-    sTask = nullptr;
-    if (mCanceled) {
+    aAddrSize = strlen(aAddress) + offsetof(struct sockaddr_un, sun_path) + 1;
+    memcpy(aAddr, &addr_un, aAddrSize);
+#else
+    struct hostent *hp;
+    struct sockaddr_in addr_in;
+
+    hp = gethostbyname("localhost");
+    if (!hp) {
         return;
     }
 
-    if (sClient->OpenSocket()) {
-        return;
-    }
-    Enqueue(1000);
-}
-
-class RilWriteTask : public Task {
-    virtual void Run();
-};
+    memset(&addr_in, 0, sizeof(addr_in));
+    addr_in.sin_family = hp->h_addrtype;
+    addr_in.sin_port = htons(RIL_TEST_PORT);
+    memcpy(&addr_in.sin_addr, hp->h_addr, hp->h_length);
 
-void RilWriteTask::Run() {
-    if(sClient->mSocket.get() < 0) {
-        NS_WARNING("Trying to write to non-open socket!");
-        return;
-    }
-    sClient->OnFileCanWriteWithoutBlocking(sClient->mSocket.rwget());
-}
-
-static void
-ConnectToRil(Monitor* aMonitor, bool* aSuccess)
-{
-    MOZ_ASSERT(!sClient);
-
-    sClient = new RilClient();
-    RilReconnectTask::Enqueue();
-    *aSuccess = true;
-    {
-        MonitorAutoLock lock(*aMonitor);
-        lock.Notify();
-    }
-    // aMonitor may have gone out of scope by now, don't touch it
+    aAddrSize = sizeof(addr_in);
+    memcpy(aAddr, &addr_in, aAddrSize);
+#endif
 }
 
 bool
-RilClient::OpenSocket()
+RilConnector::SetUp(int aFd)
 {
-
-    ScopedClose skt;
-#if defined(MOZ_WIDGET_GONK)
-    // Using a network socket to test basic functionality
-    // before we see how this works on the phone.
-    struct sockaddr_un addr;
-    socklen_t alen;
-    size_t namelen;
-    int err;
-    memset(&addr, 0, sizeof(addr));
-    strcpy(addr.sun_path, RIL_SOCKET_NAME);
-    addr.sun_family = AF_LOCAL;
-    skt.reset(socket(AF_LOCAL, SOCK_STREAM, 0));
-    alen = strlen(RIL_SOCKET_NAME) + offsetof(struct sockaddr_un, sun_path) + 1;
-#else
-    struct hostent *hp;
-    struct sockaddr_in addr;
-    socklen_t alen;
-
-    hp = gethostbyname("localhost");
-    if (hp == 0) return false;
-
-    memset(&addr, 0, sizeof(addr));
-    addr.sin_family = hp->h_addrtype;
-    addr.sin_port = htons(RIL_TEST_PORT);
-    memcpy(&addr.sin_addr, hp->h_addr, hp->h_length);
-    skt.reset(socket(hp->h_addrtype, SOCK_STREAM, 0));
-    alen = sizeof(addr);
-#endif
-
-    if (skt.get() < 0) {
-        LOG("Cannot create socket for RIL!\n");
-        return false;
-    }
-
-    // Select non-blocking IO.
-    if (-1 == fcntl(skt.get(), F_SETFL, O_NONBLOCK)) {
-        return false;
-    }
-
-    if (connect(skt.get(), (struct sockaddr *) &addr, alen) < 0) {
-#if defined(MOZ_WIDGET_GONK)
-        LOG("Cannot open socket for RIL!\n");
-#endif
-        skt.dispose();
-        return false;
-    }
-
-    // Set close-on-exec bit.
-    int flags = fcntl(skt.get(), F_GETFD);
-    if (-1 == flags) {
-        return false;
-    }
-
-    flags |= FD_CLOEXEC;
-    if (-1 == fcntl(skt.get(), F_SETFD, flags)) {
-        return false;
-    }
-
-    if (!mIOLoop->WatchFileDescriptor(skt.get(),
-                                      true,
-                                      MessageLoopForIO::WATCH_READ,
-                                      &mReadWatcher,
-                                      this)) {
-        return false;
-    }
-    mSocket = skt.forget();
-    LOG("Socket open for RIL\n");
+    // Nothing to do here.
     return true;
 }
 
 void
-RilClient::OnFileCanReadWithoutBlocking(int fd)
+RilConnector::GetSocketAddr(const sockaddr& aAddr,
+                            nsAString& aAddrStr)
 {
-    // Keep reading data until either
-    //
-    //   - mIncoming is completely read
-    //     If so, sConsumer->MessageReceived(mIncoming.forget())
-    //
-    //   - mIncoming isn't completely read, but there's no more
-    //     data available on the socket
-    //     If so, break;
+    // Unused.
+    MOZ_NOT_REACHED("This should never be called!");
+}
+
+} // anonymous namespace
+
+namespace mozilla {
+namespace ipc {
 
-    MOZ_ASSERT(fd == mSocket.get());
-    while (true) {
-        if (!mIncoming) {
-            mIncoming = new RilRawData();
-            ssize_t ret = read(fd, mIncoming->mData, RilRawData::MAX_DATA_SIZE);
-            if (ret <= 0) {
-                if (ret == -1) {
-                    if (errno == EINTR) {
-                        continue; // retry system call when interrupted
-                    }
-                    else if (errno == EAGAIN || errno == EWOULDBLOCK) {
-                        return; // no data available: return and re-poll
-                    }
-                    // else fall through to error handling on other errno's
-                }
-                LOG("Cannot read from network, error %d\n", ret);
-                // At this point, assume that we can't actually access
-                // the socket anymore, and start a reconnect loop.
-                mIncoming.forget();
-                mReadWatcher.StopWatchingFileDescriptor();
-                mWriteWatcher.StopWatchingFileDescriptor();
-                // ScopedClose will close our old socket on a reset.
-                // Setting to -1 means writes will fail with message.
-                mSocket.reset(-1);
-                RilReconnectTask::Enqueue();
-                return;
-            }
-            mIncoming->mSize = ret;
-            sConsumer->MessageReceived(mIncoming.forget());
-            if (ret < ssize_t(RilRawData::MAX_DATA_SIZE)) {
-                return;
-            }
-        }
-    }
+RilConsumer::RilConsumer(WorkerCrossThreadDispatcher* aDispatcher)
+    : mDispatcher(aDispatcher)
+    , mShutdown(false)
+{
+    ConnectSocket(new RilConnector(), RIL_SOCKET_NAME);
+}
+
+void
+RilConsumer::Shutdown()
+{
+    mShutdown = true;
+    CloseSocket();
 }
 
 void
-RilClient::OnFileCanWriteWithoutBlocking(int fd)
+RilConsumer::ReceiveSocketData(nsAutoPtr<UnixSocketRawData>& aMessage)
 {
-    // Try to write the bytes of mCurrentRilRawData.  If all were written, continue.
-    //
-    // Otherwise, save the byte position of the next byte to write
-    // within mCurrentRilRawData, and request another write when the
-    // system won't block.
-    //
+    MOZ_ASSERT(NS_IsMainThread());
 
-    MOZ_ASSERT(fd == mSocket.get());
-
-    while (true) {
-        {
-            MutexAutoLock lock(mMutex);
-
-            if (mOutgoingQ.empty() && !mCurrentRilRawData) {
-                return;
-            }
+    nsRefPtr<DispatchRILEvent> dre(new DispatchRILEvent(aMessage.forget()));
+    mDispatcher->PostTask(dre);
+}
 
-            if(!mCurrentRilRawData) {
-                mCurrentRilRawData = mOutgoingQ.front();
-                mOutgoingQ.pop();
-                mCurrentWriteOffset = 0;
-            }
-        }
-        const uint8_t *toWrite;
+void
+RilConsumer::OnConnectSuccess()
+{
+    // Nothing to do here.
+    LOG("Socket open for RIL\n");
+}
 
-        toWrite = mCurrentRilRawData->mData;
- 
-        while (mCurrentWriteOffset < mCurrentRilRawData->mSize) {
-            ssize_t write_amount = mCurrentRilRawData->mSize - mCurrentWriteOffset;
-            ssize_t written;
-            written = write (fd, toWrite + mCurrentWriteOffset,
-                             write_amount);
-            if(written > 0) {
-                mCurrentWriteOffset += written;
-            }
-            if (written != write_amount) {
-                break;
-            }
-        }
+void
+RilConsumer::OnConnectError()
+{
+    LOG("%s\n", __FUNCTION__);
+    CloseSocket();
+}
 
-        if(mCurrentWriteOffset != mCurrentRilRawData->mSize) {
-            MessageLoopForIO::current()->WatchFileDescriptor(
-                fd,
-                false,
-                MessageLoopForIO::WATCH_WRITE,
-                &mWriteWatcher,
-                this);
-            return;
-        }
-        mCurrentRilRawData = NULL;
+void
+RilConsumer::OnDisconnect()
+{
+    LOG("%s\n", __FUNCTION__);
+    if (!mShutdown) {
+        ConnectSocket(new RilConnector(), RIL_SOCKET_NAME, 1000);
     }
 }
 
-
-static void
-DisconnectFromRil(Monitor* aMonitor)
-{
-    // Prevent stale reconnect tasks from being run after we've shut
-    // down.
-    RilReconnectTask::CancelIt();
-    // XXX This might "strand" messages in the outgoing queue.  We'll
-    // assume that's OK for now.
-    sClient = nullptr;
-    {
-        MonitorAutoLock lock(*aMonitor);
-        lock.Notify();
-    }
-}
-
-//-----------------------------------------------------------------------------
-// This code runs on any thread.
-//
-
-bool
-StartRil(RilConsumer* aConsumer)
-{
-    MOZ_ASSERT(aConsumer);
-    sConsumer = aConsumer;
-
-    Monitor monitor("StartRil.monitor");
-    bool success;
-    {
-        MonitorAutoLock lock(monitor);
-
-        XRE_GetIOMessageLoop()->PostTask(
-            FROM_HERE,
-            NewRunnableFunction(ConnectToRil, &monitor, &success));
-
-        lock.Wait();
-    }
-
-    return success;
-}
-
-bool
-SendRilRawData(RilRawData** aMessage)
-{
-    if (!sClient) {
-        return false;
-    }
-
-    RilRawData *msg = *aMessage;
-    *aMessage = nullptr;
-
-    {
-        MutexAutoLock lock(sClient->mMutex);
-        sClient->mOutgoingQ.push(msg);
-    }
-    sClient->mIOLoop->PostTask(FROM_HERE, new RilWriteTask());
-
-    return true;
-}
-
-void
-StopRil()
-{
-    Monitor monitor("StopRil.monitor");
-    {
-        MonitorAutoLock lock(monitor);
-
-        XRE_GetIOMessageLoop()->PostTask(
-            FROM_HERE,
-            NewRunnableFunction(DisconnectFromRil, &monitor));
-
-        lock.Wait();
-    }
-
-    sConsumer = nullptr;
-}
-
-
 } // namespace ipc
 } // namespace mozilla
--- a/ipc/ril/Ril.h
+++ b/ipc/ril/Ril.h
@@ -2,50 +2,38 @@
 /* vim: set sw=2 ts=8 et ft=cpp: */
 /* This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 
 #ifndef mozilla_ipc_Ril_h
 #define mozilla_ipc_Ril_h 1
 
-#include "mozilla/RefPtr.h"
-
-namespace base {
-class MessageLoop;
-}
+#include <mozilla/dom/workers/Workers.h>
+#include <mozilla/ipc/UnixSocket.h>
 
 namespace mozilla {
 namespace ipc {
 
+class RilConsumer : public mozilla::ipc::UnixSocketConsumer
+{
+public:
+  RilConsumer(mozilla::dom::workers::WorkerCrossThreadDispatcher* aDispatcher);
+  virtual ~RilConsumer() { }
 
-/*
- * Represents raw data going to or coming from the RIL socket. Can
- * actually contain multiple RIL parcels in the data block, and may
- * also contain incomplete parcels on the front or back. Actual parcel
- * construction is handled in the worker thread.
- */
-struct RilRawData
-{
-    static const size_t MAX_DATA_SIZE = 1024;
-    uint8_t mData[MAX_DATA_SIZE];
+  void Shutdown();
+
+private:
+  virtual void ReceiveSocketData(nsAutoPtr<UnixSocketRawData>& aMessage);
 
-    // Number of octets in mData.
-    size_t mSize;
+  virtual void OnConnectSuccess();
+  virtual void OnConnectError();
+  virtual void OnDisconnect();
+
+private:
+  nsRefPtr<mozilla::dom::workers::WorkerCrossThreadDispatcher> mDispatcher;
+  bool mShutdown;
 };
 
-class RilConsumer : public RefCounted<RilConsumer>
-{
-public:
-    virtual ~RilConsumer() { }
-    virtual void MessageReceived(RilRawData* aMessage) { }
-};
-
-bool StartRil(RilConsumer* aConsumer);
-
-bool SendRilRawData(RilRawData** aMessage);
-
-void StopRil();
-
 } // namespace ipc
-} // namepsace mozilla
+} // namespace mozilla
 
 #endif // mozilla_ipc_Ril_h