Commit b15eb55e authored by Rémi Duraffort's avatar Rémi Duraffort Committed by Neil Williams

LAVA-950 use a ROUTER socket to connect to master

Revert c14682f7 as stretch is now required for
new lava versions.

Change-Id: I6d673d47d7933ca64bea5f30d30f6dca52a898da
parent ebbd717b
......@@ -67,7 +67,7 @@ from lava_dispatcher.job import ZMQConfig
# pylint: disable=too-many-statements
# Default values for:
PROTOCOL_VERSION = 2
PROTOCOL_VERSION = 3
# timeouts (in seconds)
TIMEOUT = 5
JOBS_CHECK_INTERVAL = 5
......@@ -266,7 +266,7 @@ def send_multipart_u(sock, data):
:param sock: The socket to use
:param data: Data to convert to byte strings
"""
return sock.send_multipart([b(d) for d in data])
return sock.send_multipart([b"master"] + [b(d) for d in data])
def send_end(sock, job_id, job, print_exception=False):
......@@ -295,10 +295,15 @@ def create_zmq_context(master_uri, hostname, ipv6, encrypt,
"""
# Connect to the master dispatcher.
context = zmq.Context()
# TODO: use a ROUTER socket
sock = context.socket(zmq.DEALER)
sock = context.socket(zmq.ROUTER)
sock.setsockopt(zmq.IDENTITY, b(hostname))
# Limit the number of messages in the queue
sock.setsockopt(zmq.SNDHWM, SEND_QUEUE)
# TODO: remove when Jessie is not supported
if hasattr(zmq, "CONNECT_RID"):
# From http://api.zeromq.org/4-2:zmq-setsockopt#toc5
# "Immediately readies that connection for data transfer with the master"
sock.setsockopt(zmq.CONNECT_RID, b"master")
if ipv6:
LOG.info("[INIT] Enabling IPv6")
......@@ -403,7 +408,6 @@ def connect_to_master(master, poller, pipe_r, sock, timeout):
:param timeout: the poll timeout
:return: True if the master had answered
"""
retry_msg = "HELLO_RETRY"
try:
sockets = dict(poller.poll(timeout * 1000))
except zmq.error.ZMQError:
......@@ -419,6 +423,12 @@ def connect_to_master(master, poller, pipe_r, sock, timeout):
msg = sock.recv_multipart()
try:
# Check master identity
master_id = u(msg.pop(0))
if master_id != "master":
LOG.error("Invalid master id '%s'. Should be 'master'",
master_id)
return False
message = u(msg[0])
except (IndexError, TypeError):
LOG.error("[INIT] Invalid message from master: %s", msg)
......@@ -431,9 +441,6 @@ def connect_to_master(master, poller, pipe_r, sock, timeout):
else:
LOG.info("[INIT] Unexpected message from master: %s", message)
LOG.info("[INIT] Greeting master => '%s' (using the same version?)",
retry_msg)
send_multipart_u(sock, [retry_msg, str(PROTOCOL_VERSION)])
return False
......@@ -628,8 +635,13 @@ def listen_to_master(master, jobs, poller, pipe_r, zmq_config, sock, timeout):
if sockets.get(sock) == zmq.POLLIN:
msg = sock.recv_multipart()
# 1: the action
# 1: identity and action
try:
master_id = u(msg.pop(0))
if master_id != "master":
LOG.error("Invalid master id '%s'. Should be 'master'",
master_id)
return
action = u(msg[0])
except (IndexError, TypeError):
LOG.error("Invalid message from master: %s", msg)
......@@ -812,7 +824,8 @@ def main():
send_multipart_u(sock, ["HELLO", str(PROTOCOL_VERSION)])
while not connect_to_master(master, poller, pipe_r, sock, timeout):
pass
LOG.info("[INIT] Greeting master => 'HELLO_RETRY' (using the same version?)")
send_multipart_u(sock, ["HELLO_RETRY", str(PROTOCOL_VERSION)])
# Loop for server instructions
LOG.info("Waiting for instructions")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment