Bug 971463: Add jacuzzi support to watch_pending; scale our response to pending load by # of running instances. r=rail
authorChris AtLee <catlee@mozilla.com>
Tue, 25 Feb 2014 13:37:28 -0500
changeset 326 057aaeff75d23cfb61ef80b3c3eb29ad4da83770
parent 325 b70909d8460a200cded051e6465a7c6fbd8e2a44
child 327 f0aa275be576643b905ec41d8bf1f61a01918b2a
push id326
push usercatlee@mozilla.com
push dateTue, 25 Feb 2014 18:37:47 +0000
reviewersrail
bugs971463
Bug 971463: Add jacuzzi support to watch_pending; scale our response to pending load by # of running instances. r=rail
aws/aws_manage_instances.py
aws/aws_watch_pending.py
--- a/aws/aws_manage_instances.py
+++ b/aws/aws_manage_instances.py
@@ -118,17 +118,17 @@ if __name__ == '__main__':
     args = parser.parse_args()
     if args.secrets:
         secrets = json.load(args.secrets)
     else:
         secrets = None
 
     logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
     if not args.quiet:
-        log.setLevel(logging.DEBUG)
+        log.setLevel(logging.INFO)
     else:
         log.setLevel(logging.ERROR)
 
     if not args.regions:
         args.regions = REGIONS
     for region in args.regions:
         if secrets:
             conn = connect_to_region(
--- a/aws/aws_watch_pending.py
+++ b/aws/aws_watch_pending.py
@@ -86,27 +86,38 @@ def aws_connect_to_region(region, secret
 def aws_get_spot_requests(region, secrets, moz_instance_type):
     """retruns a list of all open and active spot requests"""
     conn = aws_connect_to_region(region, secrets)
     filters = {"tag:moz-type": moz_instance_type}
     req = conn.get_all_spot_instance_requests(filters=filters)
     return [r for r in req if r.state in ("open", "active")]
 
 
+_aws_instances_cache = {}
+
+
 def aws_get_all_instances(regions, secrets):
     """
     Returns a list of all instances in the given regions
     """
     log.debug("fetching all instances for %s", regions)
     retval = []
     for region in regions:
-        conn = aws_connect_to_region(region, secrets)
-        reservations = conn.get_all_instances()
-        for r in reservations:
-            retval.extend(r.instances)
+        if region in _aws_instances_cache:
+            log.debug("aws_get_all_instances - cache hit for %s", region)
+            retval.extend(_aws_instances_cache[region])
+        else:
+            conn = aws_connect_to_region(region, secrets)
+            reservations = conn.get_all_instances()
+            region_instances = []
+            for r in reservations:
+                region_instances.extend(r.instances)
+            log.debug("aws_get_running_instances - caching %s", region)
+            _aws_instances_cache[region] = region_instances
+            retval.extend(region_instances)
     return retval
 
 
 def aws_filter_instances(all_instances, state=None, tags=None):
     retval = []
     for i in all_instances:
         matched = True
         if state and i.state != state:
@@ -121,16 +132,35 @@ def aws_filter_instances(all_instances, 
             # Skip loaned instances
             matched = False
             continue
         if matched:
             retval.append(i)
     return retval
 
 
+def aws_get_running_instances(all_instances, instance_type, slaveset):
+    retval = []
+    if not slaveset:
+        allocated_slaves = get_allocated_slaves(None)
+
+    for i in all_instances:
+        if i.tags.get('moz-type') != instance_type:
+            continue
+        if i.tags.get('moz-state') != 'ready':
+            continue
+        if slaveset:
+            if i.tags.get('Name') in slaveset:
+                retval.append(i)
+        elif i.tags.get('Name') not in allocated_slaves:
+            retval.append(i)
+
+    return retval
+
+
 def aws_get_reservations(regions, secrets):
     """
     Return a mapping of (availability zone, ec2 instance type) -> count
     """
     log.debug("getting reservations for %s", regions)
     retval = {}
     for region in regions:
         conn = aws_connect_to_region(region, secrets)
@@ -163,21 +193,23 @@ def aws_filter_reservations(reservations
     # Remove reservations that are used up
     for k, count in reservations.items():
         if count <= 0:
             log.debug("all reservations for %s are used; removing", k)
             del reservations[k]
 
 
 def aws_resume_instances(moz_instance_type, start_count, regions, secrets,
-                         region_priorities, instance_type_changes, dryrun):
+                         region_priorities, instance_type_changes, dryrun,
+                         slaveset):
     """Resume up to `start_count` stopped instances of the given type in the
     given regions"""
     # Fetch all our instance information
     all_instances = aws_get_all_instances(regions, secrets)
+
     # We'll filter by these tags in general
     tags = {'moz-state': 'ready', 'moz-type': moz_instance_type}
 
     # If our instance config specifies a maximum number of running instances,
     # apply that now. This may mean that we reduce start_count, or return early
     # if we're already running >= max_running
     instance_config = json.load(open("configs/%s" % moz_instance_type))
     max_running = instance_config.get('max_running')
@@ -217,16 +249,26 @@ def aws_resume_instances(moz_instance_ty
 
     # Filter the reservations
     aws_filter_reservations(reservations, running_instances)
     log.debug("filtered reservations: %s", reservations)
 
     # List of (instance, is_reserved) tuples
     to_start = []
 
+    log.debug("filtering by slaveset %s", slaveset)
+    # Filter the list of stopped instances by slaveset
+    if slaveset:
+        stopped_instances = filter(lambda i: i.tags.get('Name') in slaveset, stopped_instances)
+    else:
+        # Get list of all allocated slaves if we have no specific slaves
+        # required
+        allocated_slaves = get_allocated_slaves(None)
+        stopped_instances = filter(lambda i: i.tags.get('Name') not in allocated_slaves, stopped_instances)
+
     # While we still have reservations, start instances that can use those
     # reservations first
     for i in stopped_instances[:]:
         k = (i.placement, i.instance_type)
         if k not in reservations:
             continue
         stopped_instances.remove(i)
         to_start.append((i, True))
@@ -270,17 +312,17 @@ def aws_resume_instances(moz_instance_ty
             log.debug("Started %s instaces, breaking early", started)
             break
 
     return started
 
 
 def request_spot_instances(moz_instance_type, start_count, regions, secrets,
                            region_priorities, spot_config, dryrun,
-                           cached_cert_dir):
+                           cached_cert_dir, slaveset):
     started = 0
     spot_rules = spot_config.get("rules", {}).get(moz_instance_type)
     if not spot_rules:
         log.warn("No spot rules found for %s", moz_instance_type)
         return 0
 
     instance_config = json.load(open("configs/%s" % moz_instance_type))
     connections = []
@@ -333,17 +375,18 @@ def request_spot_instances(moz_instance_
         log.debug("Using %s", choice)
         launched = do_request_spot_instances(
             amount=need,
             region=region, secrets=secrets,
             moz_instance_type=moz_instance_type,
             ami=to_start[region]["ami"],
             instance_config=instance_config, dryrun=dryrun,
             cached_cert_dir=cached_cert_dir,
-            spot_choice=choice)
+            spot_choice=choice,
+            slaveset=slaveset)
         started += launched
 
         if started >= start_count:
             break
 
     return started
 
 
@@ -361,54 +404,58 @@ def get_puppet_certs(ip, secrets, cached
     cert_data = req.content
     with open(cert_file, "wb") as f:
         f.write(cert_data)
     return cert_data
 
 
 def do_request_spot_instances(amount, region, secrets, moz_instance_type, ami,
                               instance_config, cached_cert_dir, spot_choice,
-                              dryrun):
+                              slaveset, dryrun):
     started = 0
     for _ in range(amount):
         try:
-            do_request_spot_instance(
+            r = do_request_spot_instance(
                 region=region, secrets=secrets,
                 moz_instance_type=moz_instance_type,
                 price=spot_choice.bid_price,
                 availability_zone=spot_choice.availability_zone,
                 ami=ami, instance_config=instance_config,
                 cached_cert_dir=cached_cert_dir,
-                instance_type=spot_choice.instance_type, dryrun=dryrun)
-            started += 1
+                instance_type=spot_choice.instance_type, slaveset=slaveset,
+                dryrun=dryrun)
+            if r:
+                started += 1
         except (RuntimeError):
             log.warn("Cannot start", exc_info=True)
     return started
 
 
 def do_request_spot_instance(region, secrets, moz_instance_type, price, ami,
                              instance_config, cached_cert_dir, instance_type,
-                             availability_zone, dryrun):
+                             availability_zone, slaveset, dryrun):
     conn = aws_connect_to_region(region, secrets)
     interface = get_available_interface(
         conn=conn, moz_instance_type=moz_instance_type,
-        availability_zone=availability_zone)
+        availability_zone=availability_zone,
+        slaveset=slaveset)
     if not interface:
-        raise RuntimeError("No free network interfaces left in %s" % region)
+        log.warn("No free network interfaces left in %s" % region)
+        return False
 
     # TODO: check DNS
     fqdn = interface.tags.get("FQDN")
     if not fqdn:
         raise RuntimeError("Skipping %s without FQDN" % interface)
 
     log.debug("Spot request for %s (%s)", fqdn, price)
 
     if dryrun:
         log.info("Dry run. skipping")
-        return
+        return True
 
     spec = NetworkInterfaceSpecification(
         network_interface_id=interface.id)
     nc = NetworkInterfaceCollection(spec)
     ip = interface.private_ip_address
     certs = get_puppet_certs(ip, secrets, cached_cert_dir)
     user_data = """
 FQDN="%(fqdn)s"
@@ -455,22 +502,23 @@ EOF
         instance_type=instance_type,
         key_name=instance_config[region]["ssh_key"],
         user_data=user_data,
         block_device_map=bdm,
         network_interfaces=nc,
         instance_profile_name=instance_config[region].get("instance_profile_name"),
     )
     sir[0].add_tag("moz-type", moz_instance_type)
+    return True
 
 
 _cached_interfaces = {}
 
 
-def get_available_interface(conn, moz_instance_type, availability_zone):
+def get_available_interface(conn, moz_instance_type, availability_zone, slaveset):
     global _cached_interfaces
     if not _cached_interfaces.get(availability_zone):
         _cached_interfaces[availability_zone] = {}
     if _cached_interfaces[availability_zone].get(moz_instance_type) is None:
         filters = {
             "status": "available",
             "tag:moz-type": moz_instance_type,
             "availability-zone": availability_zone,
@@ -479,81 +527,159 @@ def get_available_interface(conn, moz_in
         if avail_ifs:
             random.shuffle(avail_ifs)
         _cached_interfaces[availability_zone][moz_instance_type] = avail_ifs
 
     log.debug("%s interfaces in %s",
               len(_cached_interfaces[availability_zone][moz_instance_type]),
               availability_zone)
     if _cached_interfaces[availability_zone][moz_instance_type]:
-        return _cached_interfaces[availability_zone][moz_instance_type].pop()
-    else:
-        return None
+        # Find one in our slaveset
+        if slaveset:
+            for i in _cached_interfaces[availability_zone][moz_instance_type]:
+                if i.tags.get("FQDN").split(".")[0] in slaveset:
+                    _cached_interfaces[availability_zone][moz_instance_type].remove(i)
+                    log.debug("using %s", i.tags.get("FQDN"))
+                    return i
+        else:
+            allocated_slaves = get_allocated_slaves(None)
+            for i in _cached_interfaces[availability_zone][moz_instance_type]:
+                if i.tags.get("FQDN").split(".")[0] not in allocated_slaves:
+                    _cached_interfaces[availability_zone][moz_instance_type].remove(i)
+                    log.debug("using %s", i.tags.get("FQDN"))
+                    return i
+    return None
 
 
 def get_ami(region, secrets, moz_instance_type):
     conn = aws_connect_to_region(region, secrets)
     avail_amis = conn.get_all_images(
         owners=["self"],
         filters={"tag:moz-type": moz_instance_type})
     last_ami = sorted(avail_amis,
                       key=lambda ami: ami.tags.get("moz-created"))[-1]
     return last_ami
 
 
+JACUZZI_BASE_URL = "http://jacuzzi-allocator.pub.build.mozilla.org/v1"
+
+
+_jacuzzi_allocated_cache = {}
+
+
+def get_allocated_slaves(buildername):
+    if buildername in _jacuzzi_allocated_cache:
+        return _jacuzzi_allocated_cache[buildername]
+
+    if buildername is None:
+        log.debug("getting set of all allocated slaves")
+        r = requests.get("{0}/allocated/all".format(JACUZZI_BASE_URL))
+        _jacuzzi_allocated_cache[buildername] = frozenset(r.json()['machines'])
+        return _jacuzzi_allocated_cache[buildername]
+
+    log.debug("getting slaves allocated to %s", buildername)
+    r = requests.get("{0}/builders/{1}".format(JACUZZI_BASE_URL, buildername))
+    # Handle 404 specially
+    if r.status_code == 404:
+        _jacuzzi_allocated_cache[buildername] = None
+        return None
+    _jacuzzi_allocated_cache[buildername] = frozenset(r.json()['machines'])
+    return _jacuzzi_allocated_cache[buildername]
+
+
 def aws_watch_pending(dburl, regions, secrets, builder_map, region_priorities,
                       spot_config, dryrun, cached_cert_dir,
                       instance_type_changes):
     # First find pending jobs in the db
     db = sa.create_engine(dburl)
     pending = find_pending(db)
 
-    # Mapping of instance types to # of instances we want to creates
+    if not pending:
+        log.info("no pending jobs! all done!")
+        return
+    log.info("processing %i pending jobs", len(pending))
+
+    # Mapping of (instance types, slaveset) to # of instances we want to
+    # creates
     to_create_ondemand = defaultdict(int)
     to_create_spot = defaultdict(int)
+
     # Then match them to the builder_map
     for pending_buildername, brid in pending:
         for buildername_exp, instance_type in builder_map.items():
             if re.match(buildername_exp, pending_buildername):
+                slaveset = get_allocated_slaves(pending_buildername)
                 if find_retries(db, brid) == 0:
-                    to_create_spot[instance_type] += 1
+                    to_create_spot[instance_type, slaveset] += 1
                 else:
-                    to_create_ondemand[instance_type] += 1
+                    to_create_ondemand[instance_type, slaveset] += 1
                 break
         else:
             log.debug("%s has pending jobs, but no instance types defined",
                       pending_buildername)
 
-    for instance_type, count in to_create_spot.items():
-        log.debug("need %i spot %s", count, instance_type)
+    if not to_create_spot and not to_create_ondemand:
+        log.info("no pending jobs we can do anything about! all done!")
+        return
+
+    # For each instance_type, slaveset, find how many are currently running,
+    # and scale our count accordingly
+    all_instances = aws_get_all_instances(regions, secrets)
+    for d in to_create_spot, to_create_ondemand:
+        to_delete = set()
+        for (instance_type, slaveset), count in d.iteritems():
+            running = aws_get_running_instances(all_instances, instance_type, slaveset)
+            log.debug("%i running for %s %s", len(running), instance_type, slaveset)
+            # TODO: This logic is probably too simple
+            # Reduce the number of required slaves by 10% of those that are
+            # running
+            delta = len(running) / 10
+            log.debug("reducing required count for %s %s by %i (%i running; need %i)", instance_type, slaveset, delta, len(running), count)
+            d[instance_type, slaveset] = max(0, count - delta)
+            if d[instance_type, slaveset] == 0:
+                log.debug("removing requirement for %s %s", instance_type, slaveset)
+                to_delete.add((instance_type, slaveset))
+
+            # If slaveset is not None, and all our slaves are running, we should
+            # remove it from the set of things to try and start instances for
+            if slaveset and set(i.tags.get('Name') for i in running) == slaveset:
+                log.debug("removing %s %s since all the slaves are running", instance_type, slaveset)
+                to_delete.add((instance_type, slaveset))
+
+        for instance_type, slaveset in to_delete:
+            del d[instance_type, slaveset]
+
+    for (instance_type, slaveset), count in to_create_spot.iteritems():
+        log.debug("need %i spot %s for slaveset %s", count, instance_type, slaveset)
         started = request_spot_instances(
             moz_instance_type=instance_type, start_count=count,
             regions=regions, secrets=secrets,
             region_priorities=region_priorities, spot_config=spot_config,
-            dryrun=dryrun, cached_cert_dir=cached_cert_dir)
+            dryrun=dryrun, cached_cert_dir=cached_cert_dir,
+            slaveset=slaveset)
         count -= started
-        log.info("%s - started %i spot instances; need %i",
-                 instance_type, started, count)
+        log.info("%s - started %i spot instances for slaveset %s; need %i",
+                 instance_type, started, slaveset, count)
 
         # Add leftover to ondemand
-        to_create_ondemand[instance_type] += count
+        to_create_ondemand[instance_type, slaveset] += count
 
-    for instance_type, count in to_create_ondemand.items():
-        log.debug("need %i ondemand %s", count, instance_type)
+    for (instance_type, slaveset), count in to_create_ondemand.iteritems():
+        log.debug("need %i ondemand %s for slaveset %s", count, instance_type, slaveset)
         if count < 1:
             continue
 
         # Check for stopped instances in the given regions and start them if
         # there are any
         started = aws_resume_instances(instance_type, count, regions, secrets,
                                        region_priorities,
-                                       instance_type_changes, dryrun)
+                                       instance_type_changes, dryrun, slaveset)
         count -= started
-        log.info("%s - started %i instances; need %i",
-                 instance_type, started, count)
+        log.info("%s - started %i instances for slaveset %s; need %i",
+                 instance_type, started, slaveset, count)
 
 if __name__ == '__main__':
     import argparse
     parser = argparse.ArgumentParser()
     parser.add_argument("-r", "--region", action="append", dest="regions",
                         required=True)
     parser.add_argument("-k", "--secrets", type=argparse.FileType('r'),
                         required=True)
@@ -583,8 +709,9 @@ if __name__ == '__main__':
         secrets=secrets,
         builder_map=config['buildermap'],
         region_priorities=config['region_priorities'],
         dryrun=args.dryrun,
         spot_config=config.get("spot"),
         cached_cert_dir=args.cached_cert_dir,
         instance_type_changes=config.get("instance_type_changes", {})
     )
+    log.info("done")