--- a/aws/aws_watch_pending.py
+++ b/aws/aws_watch_pending.py
@@ -86,27 +86,38 @@ def aws_connect_to_region(region, secret
def aws_get_spot_requests(region, secrets, moz_instance_type):
"""retruns a list of all open and active spot requests"""
conn = aws_connect_to_region(region, secrets)
filters = {"tag:moz-type": moz_instance_type}
req = conn.get_all_spot_instance_requests(filters=filters)
return [r for r in req if r.state in ("open", "active")]
+_aws_instances_cache = {}
+
+
def aws_get_all_instances(regions, secrets):
"""
Returns a list of all instances in the given regions
"""
log.debug("fetching all instances for %s", regions)
retval = []
for region in regions:
- conn = aws_connect_to_region(region, secrets)
- reservations = conn.get_all_instances()
- for r in reservations:
- retval.extend(r.instances)
+ if region in _aws_instances_cache:
+ log.debug("aws_get_all_instances - cache hit for %s", region)
+ retval.extend(_aws_instances_cache[region])
+ else:
+ conn = aws_connect_to_region(region, secrets)
+ reservations = conn.get_all_instances()
+ region_instances = []
+ for r in reservations:
+ region_instances.extend(r.instances)
+ log.debug("aws_get_running_instances - caching %s", region)
+ _aws_instances_cache[region] = region_instances
+ retval.extend(region_instances)
return retval
def aws_filter_instances(all_instances, state=None, tags=None):
retval = []
for i in all_instances:
matched = True
if state and i.state != state:
@@ -121,16 +132,35 @@ def aws_filter_instances(all_instances,
# Skip loaned instances
matched = False
continue
if matched:
retval.append(i)
return retval
+def aws_get_running_instances(all_instances, instance_type, slaveset):
+ retval = []
+ if not slaveset:
+ allocated_slaves = get_allocated_slaves(None)
+
+ for i in all_instances:
+ if i.tags.get('moz-type') != instance_type:
+ continue
+ if i.tags.get('moz-state') != 'ready':
+ continue
+ if slaveset:
+ if i.tags.get('Name') in slaveset:
+ retval.append(i)
+ elif i.tags.get('Name') not in allocated_slaves:
+ retval.append(i)
+
+ return retval
+
+
def aws_get_reservations(regions, secrets):
"""
Return a mapping of (availability zone, ec2 instance type) -> count
"""
log.debug("getting reservations for %s", regions)
retval = {}
for region in regions:
conn = aws_connect_to_region(region, secrets)
@@ -163,21 +193,23 @@ def aws_filter_reservations(reservations
# Remove reservations that are used up
for k, count in reservations.items():
if count <= 0:
log.debug("all reservations for %s are used; removing", k)
del reservations[k]
def aws_resume_instances(moz_instance_type, start_count, regions, secrets,
- region_priorities, instance_type_changes, dryrun):
+ region_priorities, instance_type_changes, dryrun,
+ slaveset):
"""Resume up to `start_count` stopped instances of the given type in the
given regions"""
# Fetch all our instance information
all_instances = aws_get_all_instances(regions, secrets)
+
# We'll filter by these tags in general
tags = {'moz-state': 'ready', 'moz-type': moz_instance_type}
# If our instance config specifies a maximum number of running instances,
# apply that now. This may mean that we reduce start_count, or return early
# if we're already running >= max_running
instance_config = json.load(open("configs/%s" % moz_instance_type))
max_running = instance_config.get('max_running')
@@ -217,16 +249,26 @@ def aws_resume_instances(moz_instance_ty
# Filter the reservations
aws_filter_reservations(reservations, running_instances)
log.debug("filtered reservations: %s", reservations)
# List of (instance, is_reserved) tuples
to_start = []
+ log.debug("filtering by slaveset %s", slaveset)
+ # Filter the list of stopped instances by slaveset
+ if slaveset:
+ stopped_instances = filter(lambda i: i.tags.get('Name') in slaveset, stopped_instances)
+ else:
+ # Get list of all allocated slaves if we have no specific slaves
+ # required
+ allocated_slaves = get_allocated_slaves(None)
+ stopped_instances = filter(lambda i: i.tags.get('Name') not in allocated_slaves, stopped_instances)
+
# While we still have reservations, start instances that can use those
# reservations first
for i in stopped_instances[:]:
k = (i.placement, i.instance_type)
if k not in reservations:
continue
stopped_instances.remove(i)
to_start.append((i, True))
@@ -270,17 +312,17 @@ def aws_resume_instances(moz_instance_ty
log.debug("Started %s instaces, breaking early", started)
break
return started
def request_spot_instances(moz_instance_type, start_count, regions, secrets,
region_priorities, spot_config, dryrun,
- cached_cert_dir):
+ cached_cert_dir, slaveset):
started = 0
spot_rules = spot_config.get("rules", {}).get(moz_instance_type)
if not spot_rules:
log.warn("No spot rules found for %s", moz_instance_type)
return 0
instance_config = json.load(open("configs/%s" % moz_instance_type))
connections = []
@@ -333,17 +375,18 @@ def request_spot_instances(moz_instance_
log.debug("Using %s", choice)
launched = do_request_spot_instances(
amount=need,
region=region, secrets=secrets,
moz_instance_type=moz_instance_type,
ami=to_start[region]["ami"],
instance_config=instance_config, dryrun=dryrun,
cached_cert_dir=cached_cert_dir,
- spot_choice=choice)
+ spot_choice=choice,
+ slaveset=slaveset)
started += launched
if started >= start_count:
break
return started
@@ -361,54 +404,58 @@ def get_puppet_certs(ip, secrets, cached
cert_data = req.content
with open(cert_file, "wb") as f:
f.write(cert_data)
return cert_data
def do_request_spot_instances(amount, region, secrets, moz_instance_type, ami,
instance_config, cached_cert_dir, spot_choice,
- dryrun):
+ slaveset, dryrun):
started = 0
for _ in range(amount):
try:
- do_request_spot_instance(
+ r = do_request_spot_instance(
region=region, secrets=secrets,
moz_instance_type=moz_instance_type,
price=spot_choice.bid_price,
availability_zone=spot_choice.availability_zone,
ami=ami, instance_config=instance_config,
cached_cert_dir=cached_cert_dir,
- instance_type=spot_choice.instance_type, dryrun=dryrun)
- started += 1
+ instance_type=spot_choice.instance_type, slaveset=slaveset,
+ dryrun=dryrun)
+ if r:
+ started += 1
except (RuntimeError):
log.warn("Cannot start", exc_info=True)
return started
def do_request_spot_instance(region, secrets, moz_instance_type, price, ami,
instance_config, cached_cert_dir, instance_type,
- availability_zone, dryrun):
+ availability_zone, slaveset, dryrun):
conn = aws_connect_to_region(region, secrets)
interface = get_available_interface(
conn=conn, moz_instance_type=moz_instance_type,
- availability_zone=availability_zone)
+ availability_zone=availability_zone,
+ slaveset=slaveset)
if not interface:
- raise RuntimeError("No free network interfaces left in %s" % region)
+ log.warn("No free network interfaces left in %s" % region)
+ return False
# TODO: check DNS
fqdn = interface.tags.get("FQDN")
if not fqdn:
raise RuntimeError("Skipping %s without FQDN" % interface)
log.debug("Spot request for %s (%s)", fqdn, price)
if dryrun:
log.info("Dry run. skipping")
- return
+ return True
spec = NetworkInterfaceSpecification(
network_interface_id=interface.id)
nc = NetworkInterfaceCollection(spec)
ip = interface.private_ip_address
certs = get_puppet_certs(ip, secrets, cached_cert_dir)
user_data = """
FQDN="%(fqdn)s"
@@ -455,22 +502,23 @@ EOF
instance_type=instance_type,
key_name=instance_config[region]["ssh_key"],
user_data=user_data,
block_device_map=bdm,
network_interfaces=nc,
instance_profile_name=instance_config[region].get("instance_profile_name"),
)
sir[0].add_tag("moz-type", moz_instance_type)
+ return True
_cached_interfaces = {}
-def get_available_interface(conn, moz_instance_type, availability_zone):
+def get_available_interface(conn, moz_instance_type, availability_zone, slaveset):
global _cached_interfaces
if not _cached_interfaces.get(availability_zone):
_cached_interfaces[availability_zone] = {}
if _cached_interfaces[availability_zone].get(moz_instance_type) is None:
filters = {
"status": "available",
"tag:moz-type": moz_instance_type,
"availability-zone": availability_zone,
@@ -479,81 +527,159 @@ def get_available_interface(conn, moz_in
if avail_ifs:
random.shuffle(avail_ifs)
_cached_interfaces[availability_zone][moz_instance_type] = avail_ifs
log.debug("%s interfaces in %s",
len(_cached_interfaces[availability_zone][moz_instance_type]),
availability_zone)
if _cached_interfaces[availability_zone][moz_instance_type]:
- return _cached_interfaces[availability_zone][moz_instance_type].pop()
- else:
- return None
+ # Find one in our slaveset
+ if slaveset:
+ for i in _cached_interfaces[availability_zone][moz_instance_type]:
+ if i.tags.get("FQDN").split(".")[0] in slaveset:
+ _cached_interfaces[availability_zone][moz_instance_type].remove(i)
+ log.debug("using %s", i.tags.get("FQDN"))
+ return i
+ else:
+ allocated_slaves = get_allocated_slaves(None)
+ for i in _cached_interfaces[availability_zone][moz_instance_type]:
+ if i.tags.get("FQDN").split(".")[0] not in allocated_slaves:
+ _cached_interfaces[availability_zone][moz_instance_type].remove(i)
+ log.debug("using %s", i.tags.get("FQDN"))
+ return i
+ return None
def get_ami(region, secrets, moz_instance_type):
conn = aws_connect_to_region(region, secrets)
avail_amis = conn.get_all_images(
owners=["self"],
filters={"tag:moz-type": moz_instance_type})
last_ami = sorted(avail_amis,
key=lambda ami: ami.tags.get("moz-created"))[-1]
return last_ami
+JACUZZI_BASE_URL = "http://jacuzzi-allocator.pub.build.mozilla.org/v1"
+
+
+_jacuzzi_allocated_cache = {}
+
+
+def get_allocated_slaves(buildername):
+ if buildername in _jacuzzi_allocated_cache:
+ return _jacuzzi_allocated_cache[buildername]
+
+ if buildername is None:
+ log.debug("getting set of all allocated slaves")
+ r = requests.get("{0}/allocated/all".format(JACUZZI_BASE_URL))
+ _jacuzzi_allocated_cache[buildername] = frozenset(r.json()['machines'])
+ return _jacuzzi_allocated_cache[buildername]
+
+ log.debug("getting slaves allocated to %s", buildername)
+ r = requests.get("{0}/builders/{1}".format(JACUZZI_BASE_URL, buildername))
+ # Handle 404 specially
+ if r.status_code == 404:
+ _jacuzzi_allocated_cache[buildername] = None
+ return None
+ _jacuzzi_allocated_cache[buildername] = frozenset(r.json()['machines'])
+ return _jacuzzi_allocated_cache[buildername]
+
+
def aws_watch_pending(dburl, regions, secrets, builder_map, region_priorities,
spot_config, dryrun, cached_cert_dir,
instance_type_changes):
# First find pending jobs in the db
db = sa.create_engine(dburl)
pending = find_pending(db)
- # Mapping of instance types to # of instances we want to creates
+ if not pending:
+ log.info("no pending jobs! all done!")
+ return
+ log.info("processing %i pending jobs", len(pending))
+
+ # Mapping of (instance types, slaveset) to # of instances we want to
+ # creates
to_create_ondemand = defaultdict(int)
to_create_spot = defaultdict(int)
+
# Then match them to the builder_map
for pending_buildername, brid in pending:
for buildername_exp, instance_type in builder_map.items():
if re.match(buildername_exp, pending_buildername):
+ slaveset = get_allocated_slaves(pending_buildername)
if find_retries(db, brid) == 0:
- to_create_spot[instance_type] += 1
+ to_create_spot[instance_type, slaveset] += 1
else:
- to_create_ondemand[instance_type] += 1
+ to_create_ondemand[instance_type, slaveset] += 1
break
else:
log.debug("%s has pending jobs, but no instance types defined",
pending_buildername)
- for instance_type, count in to_create_spot.items():
- log.debug("need %i spot %s", count, instance_type)
+ if not to_create_spot and not to_create_ondemand:
+ log.info("no pending jobs we can do anything about! all done!")
+ return
+
+ # For each instance_type, slaveset, find how many are currently running,
+ # and scale our count accordingly
+ all_instances = aws_get_all_instances(regions, secrets)
+ for d in to_create_spot, to_create_ondemand:
+ to_delete = set()
+ for (instance_type, slaveset), count in d.iteritems():
+ running = aws_get_running_instances(all_instances, instance_type, slaveset)
+ log.debug("%i running for %s %s", len(running), instance_type, slaveset)
+ # TODO: This logic is probably too simple
+ # Reduce the number of required slaves by 10% of those that are
+ # running
+ delta = len(running) / 10
+ log.debug("reducing required count for %s %s by %i (%i running; need %i)", instance_type, slaveset, delta, len(running), count)
+ d[instance_type, slaveset] = max(0, count - delta)
+ if d[instance_type, slaveset] == 0:
+ log.debug("removing requirement for %s %s", instance_type, slaveset)
+ to_delete.add((instance_type, slaveset))
+
+ # If slaveset is not None, and all our slaves are running, we should
+ # remove it from the set of things to try and start instances for
+ if slaveset and set(i.tags.get('Name') for i in running) == slaveset:
+ log.debug("removing %s %s since all the slaves are running", instance_type, slaveset)
+ to_delete.add((instance_type, slaveset))
+
+ for instance_type, slaveset in to_delete:
+ del d[instance_type, slaveset]
+
+ for (instance_type, slaveset), count in to_create_spot.iteritems():
+ log.debug("need %i spot %s for slaveset %s", count, instance_type, slaveset)
started = request_spot_instances(
moz_instance_type=instance_type, start_count=count,
regions=regions, secrets=secrets,
region_priorities=region_priorities, spot_config=spot_config,
- dryrun=dryrun, cached_cert_dir=cached_cert_dir)
+ dryrun=dryrun, cached_cert_dir=cached_cert_dir,
+ slaveset=slaveset)
count -= started
- log.info("%s - started %i spot instances; need %i",
- instance_type, started, count)
+ log.info("%s - started %i spot instances for slaveset %s; need %i",
+ instance_type, started, slaveset, count)
# Add leftover to ondemand
- to_create_ondemand[instance_type] += count
+ to_create_ondemand[instance_type, slaveset] += count
- for instance_type, count in to_create_ondemand.items():
- log.debug("need %i ondemand %s", count, instance_type)
+ for (instance_type, slaveset), count in to_create_ondemand.iteritems():
+ log.debug("need %i ondemand %s for slaveset %s", count, instance_type, slaveset)
if count < 1:
continue
# Check for stopped instances in the given regions and start them if
# there are any
started = aws_resume_instances(instance_type, count, regions, secrets,
region_priorities,
- instance_type_changes, dryrun)
+ instance_type_changes, dryrun, slaveset)
count -= started
- log.info("%s - started %i instances; need %i",
- instance_type, started, count)
+ log.info("%s - started %i instances for slaveset %s; need %i",
+ instance_type, started, slaveset, count)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-r", "--region", action="append", dest="regions",
required=True)
parser.add_argument("-k", "--secrets", type=argparse.FileType('r'),
required=True)
@@ -583,8 +709,9 @@ if __name__ == '__main__':
secrets=secrets,
builder_map=config['buildermap'],
region_priorities=config['region_priorities'],
dryrun=args.dryrun,
spot_config=config.get("spot"),
cached_cert_dir=args.cached_cert_dir,
instance_type_changes=config.get("instance_type_changes", {})
)
+ log.info("done")