vcsreplicator: retry message sending on error (Bug 1415233) r=gps
authorConnor Sheehan <sheehan@mozilla.com>
Wed, 12 Sep 2018 19:13:01 +0000
changeset 6457 687e1b127694
parent 6456 d8317e0430f9
child 6458 da0eedb4b48a
push id3244
push usercosheehan@mozilla.com
push dateWed, 12 Sep 2018 20:26:44 +0000
treeherderversion-control-tools@687e1b127694 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersgps
bugs1415233
vcsreplicator: retry message sending on error (Bug 1415233) r=gps This commit makes the vcsreplicator Producer retry message sending on error. We are currently running into an issue on merge-days and during other large pushes to hg.mozilla.org, where the push will fail during the vcsreplicator.pretxnclose hook. This hook does nothing except send a heartbeat to Kafka after all other message processing, to ensure the replication log is writable. Since the hook is so simple, we believe the Kafka connection is timing out while waiting for processing the large push to complete and therefore the heartbeat sent over the dead connection fails. This commit makes the message sending procedure re-initialize the client on a failure and try to send the message again. Differential Revision: https://phabricator.services.mozilla.com/D4773
pylib/vcsreplicator/vcsreplicator/producer.py
--- a/pylib/vcsreplicator/vcsreplicator/producer.py
+++ b/pylib/vcsreplicator/vcsreplicator/producer.py
@@ -4,19 +4,20 @@
 
 from __future__ import absolute_import, unicode_literals
 
 import json
 import logging
 import time
 
 from kafka.producer.base import Producer as KafkaProducer
+from kafka.common import KafkaError
 
 
-logger = logging.getLogger(__name__)
+logger = logging.getLogger('kafka.producer')
 
 MESSAGE_HEADER_V1 = b'1\n'
 
 
 class Producer(KafkaProducer):
     """A Kafka Producer that writes to a pre-defined topic."""
 
     def __init__(self, client, topic, **kwargs):
@@ -40,18 +41,27 @@ class Producer(KafkaProducer):
         # latency.
         o['_created'] = time.time()
         # We currently only support 1 message format. It is
         # "1\n" followed by a JSON payload. No length is encoded,
         # as Kafka does this for us.
         j = json.dumps(o, sort_keys=True)
         msg = b''.join([MESSAGE_HEADER_V1, j])
 
-        return super(Producer, self).send_messages(
-            self.topic, partition, msg)
+        try:
+            return super(Producer, self).send_messages(
+                self.topic, partition, msg)
+
+        except KafkaError:
+            logger.exception('error sending message to Kafka; '
+                             'reinitializing client to retry')
+            self.client.reinit()
+
+            return super(Producer, self).send_messages(
+                self.topic, partition, msg)
 
 
 def send_heartbeat(producer, partition):
     """Sends a dummy message to confirm that the queue is running."""
     return producer.send_message({
         'name': 'heartbeat-1',
     }, partition=partition)