Error of distributed training with Edward

Greetings,

The error arises when I am trying to run distributed training with Edward. Error message and code are below:

Error message:

# Cannot assign a device for operation 'optimizer/dense_1/bias/RMSProp_1': Operation was explicitly assigned to /job:ps/task:0 but available devices are [ /job:localhost/replica:0/task:0/device:CPU:0 ]. Make sure the device specification refers to a valid device. [[Node: optimizer/dense_1/bias/RMSProp_1 = VariableV2[_class=["loc:@dense_1/bias"], container="", dtype=DT_FLOAT, shape=[5], shared_name="", _device="/job:ps/task:0"]()]]

Code:

parameter_servers = ["localhost:2222"]
workers = ["localhost:2223"]
cluster = tf.train.ClusterSpec({"ps": parameter_servers, "worker": workers})

# start a server for a specific task
server = tf.train.Server(
     cluster,
     job_name=FLAGS.job_name,
     task_index=FLAGS.task_index)
     
class VAE(object):
   def __init__():
       # MODEL
       self.n_features = n_features
       self.params = {
                'M': 2048,
                'd': 5,
                'n_epoch': 2,
                'hidden_d': [25, 5],
                'learning_rate': 0.01
            }
       self.saved_dir_path = dir_path
       self.ckpt_path = os.path.join(self.saved_dir_path,
                                     'checkpointFiles/') + 'model.ckpt'
       # distributed training
       if FLAGS.job_name == "ps":
           server.join()
       elif FLAGS.job_name == "worker":
           # Between-graph replication
           with tf.device(tf.train.replica_device_setter(
                   worker_device="/job:worker/task:%d" % FLAGS.task_index,
                   cluster=cluster)):
               self.global_step = tf.get_variable(
                                                    'global_step',
                                                    [],
                                                    dtype=tf.int64,
                                                    initializer=tf.constant_initializer(0),
                                                    trainable=False)
               self.z = Normal(
                   loc=tf.zeros([self.params['M'], self.params['d']]),
                   scale=tf.ones([self.params['M'], self.params['d']]))
               # self.hidden0 = tf.layers.dense(
               self.hidden = tf.layers.dense(
                   self.z, self.params['hidden_d'][0], activation=tf.nn.relu)
               if len(self.params['hidden_d']) > 1:
                   # for i in xrange(1, len(params[hidden_d])):
                   #     self.__dict__['hidden' + str(i)] = \
                   #         tf.layers.dense(
                   #             self.__dict__['hidden' + str(i-1)],
                   #             self.params['hidden_d'][i],
                   #             activation=tf.nn.relu)
                   for i in xrange(1, len(params['hidden_d'])):
                       self.hidden = \
                           tf.layers.dense(
                               self.hidden,
                               self.params['hidden_d'][i],
                               activation=tf.nn.relu)
               self.x = Bernoulli(
                   logits=tf.layers.dense(
                       # self.__dict__['hidden' + str(len(params['hidden_d'] - 1))],
                       self.hidden,
                       self.n_features), dtype=tf.float64)

               # INFERENCE
               self.x_ph = tf.placeholder(dtype=tf.float64,shape=[None, self.n_features])
               self.hidden = tf.layers.dense(
                   tf.cast(self.x_ph, tf.float32),
                   self.params['hidden_d'][-1],
                   activation=tf.nn.relu)
               if len(self.params['hidden_d']) > 1:
                   for i in xrange(1, len(params['hidden_d'])):
                       j = -(1+i)
                       self.hidden = \
                           tf.layers.dense(
                               self.hidden,
                               self.params['hidden_d'][j],
                               activation=tf.nn.relu)
               self.qz = Normal(
                   loc=tf.layers.dense(self.hidden, self.params['d']),
                   scale=tf.layers.dense(
                       self.hidden, self.params['d'], activation=tf.nn.softplus))
               self.x_avg = Bernoulli(
                   logits=tf.reduce_mean(self.x.parameters['logits'], 0),
                   name='x_avg')
               self.log_likli = tf.reduce_mean(self.x_avg.log_prob(self.x_ph), 1)
               self.optimizer = tf.train.RMSPropOptimizer(
                   self.params['learning_rate'], epsilon=1.0)
               # self.
               self.inference = ed.KLqp({self.z: self.qz}, data={self.x: self.x_ph})
               self.inference_init = self.inference.initialize(
                   optimizer=self.optimizer, global_step = self.global_step, logdir='log')
               self.init = tf.global_variables_initializer()
               self.saver = tf.train.Saver()

   def train(self, train_data):
      #Generate x_batch
      start = 0  # pointer to where we are in iteration
      while True:
         stop = start + self.params['M']
         diff = stop - train_data.shape[0]
         if diff <= 0:
             batch = train_data[start:stop]
             start += self.params['M']
         else:
             batch = np.concatenate((train_data[start:], train_data[:diff]))
             start = diff
         yield batch
       train_data_generator = batch

       saver_hook = tf.train.CheckpointSaverHook(
                                                 checkpoint_dir=FLAGS.model_path,
                                                 save_steps=100,
                                                 saver=tf.train.Saver(),
                                                 checkpoint_basename='model.ckpt',
                                                 scaffold=None
                                                 )

       hooks = [saver_hook]

       with tf.train.MonitoredTrainingSession(
                                              master=server.target,
                                              is_chief=(FLAGS.task_index == 0),
                                              checkpoint_dir=FLAGS.model_path,
                                              hooks=hooks,
                                              config= tf.ConfigProto(allow_soft_placement=True,
                                                                     log_device_placement=True),
                                              save_summaries_steps=None,
                                              save_summaries_secs=None
                                              ) as sess:
           sess.run(self.init)
           # sess.run(self.inference_init)
           # self.inference.initialize(optimizer=self.optimizer)
           n_iter_per_epoch = np.ceil(
               train_data.shape[0] / self.params['M']).astype(int)

           for epoch in xrange(1, self.params['n_epoch'] + 1):
               print "Epoch: {0}".format(epoch)
               avg_loss = 0.0
               pbar = Progbar(n_iter_per_epoch)
               for t in xrange(1, n_iter_per_epoch + 1):
                   pbar.update(t)
                   x_batch = next(train_data_generator)
                   info_dict = self.inference.update(
                       feed_dict={self.x_ph: x_batch})
                   avg_loss += info_dict['loss']
               avg_loss /= n_iter_per_epoch
               avg_loss /= self.params['M']
               print "-log p(x) <= {:0.3f}".format(avg_loss)
           print "Done training the model."
           
 if __name__ == "__main__":
     vae = VAE()
     vae.train(data)

The error stack is as follows:

2018-11-12 15:46:11.525824: I tensorflow/core/platform/cpu_feature_guard.cc:140] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2018-11-12 15:46:11.527300: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> localhost:2222}
2018-11-12 15:46:11.527310: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> localhost:2223}
2018-11-12 15:46:11.527775: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:324] Started server with target: grpc://localhost:2223
51 features are used.
2018-11-12 15:46:16.294466: I tensorflow/core/distributed_runtime/master_session.cc:1024] Start master session e39a9b7d7a1216dc with config:
Epoch: 1
 1/10 [ 10%] ███                            ETA: 0sTraceback (most recent call last):
  File "dist_vae.py", line 357, in <module>
    vae.train(data)
  File "dist_vae.py", line 220, in train
    feed_dict={self.x_ph: x_batch})
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/edward/inferences/variational_inference.py", line 154, in update
    _, t, loss = sess.run([self.train, self.increment_t, self.loss], feed_dict)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 905, in run
    run_metadata_ptr)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 1137, in _run
    feed_dict_tensor, options, run_metadata)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 1355, in _do_run
    options, run_metadata)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 1374, in _do_call
    raise type(e)(node_def, op, message)
tensorflow.python.framework.errors_impl.InvalidArgumentError: Cannot assign a device for operation 'optimizer/dense_1/bias/RMSProp_1': Operation was explicitly assigned to /job:ps/task:0 but available devices are [ /job:localhost/replica:0/task:0/device:CPU:0 ]. Make sure the device specification refers to a valid device.
	 [[Node: optimizer/dense_1/bias/RMSProp_1 = VariableV2[_class=["loc:@dense_1/bias"], container="", dtype=DT_FLOAT, shape=[5], shared_name="", _device="/job:ps/task:0"]()]]

Caused by op u'optimizer/dense_1/bias/RMSProp_1', defined at:
  File "dist_vae.py", line 356, in <module>
    vae = VAE(filepath, params, n_features)
  File "dist_vae.py", line 173, in __init__
    optimizer=self.optimizer, global_step = self.global_step, logdir='log')
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/edward/inferences/klqp.py", line 110, in initialize
    return super(KLqp, self).initialize(*args, **kwargs)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/edward/inferences/variational_inference.py", line 121, in initialize
    global_step=global_step)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/training/optimizer.py", line 520, in apply_gradients
    self._create_slots([_get_variable_for(v) for v in var_list])
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/training/rmsprop.py", line 115, in _create_slots
    self._zeros_slot(v, "momentum", self._name)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/training/optimizer.py", line 910, in _zeros_slot
    named_slots[_var_key(var)] = slot_creator.create_zeros_slot(var, op_name)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/training/slot_creator.py", line 174, in create_zeros_slot
    colocate_with_primary=colocate_with_primary)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/training/slot_creator.py", line 148, in create_slot_with_initializer
    dtype)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/training/slot_creator.py", line 67, in _create_slot_var
    validate_shape=validate_shape)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/ops/variable_scope.py", line 1297, in get_variable
    constraint=constraint)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/ops/variable_scope.py", line 1093, in get_variable
    constraint=constraint)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/ops/variable_scope.py", line 439, in get_variable
    constraint=constraint)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/ops/variable_scope.py", line 408, in _true_getter
    use_resource=use_resource, constraint=constraint)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/ops/variable_scope.py", line 800, in _get_single_variable
    use_resource=use_resource)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/ops/variable_scope.py", line 2157, in variable
    use_resource=use_resource)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/ops/variable_scope.py", line 2147, in <lambda>
    previous_getter = lambda **kwargs: default_variable_creator(None, **kwargs)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/ops/variable_scope.py", line 2130, in default_variable_creator
    constraint=constraint)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/ops/variables.py", line 233, in __init__
    constraint=constraint)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/ops/variables.py", line 333, in _init_from_args
    name=name)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/ops/state_ops.py", line 134, in variable_op_v2
    shared_name=shared_name)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/ops/gen_state_ops.py", line 1043, in _variable_v2
    shared_name=shared_name, name=name)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/framework/op_def_library.py", line 787, in _apply_op_helper
    op_def=op_def)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/framework/ops.py", line 3271, in create_op
    op_def=op_def)
  File "/Users/shaarvanikavula/anaconda2/lib/python2.7/site-packages/tensorflow/python/framework/ops.py", line 1650, in __init__
    self._traceback = self._graph._extract_stack()  # pylint: disable=protected-access

InvalidArgumentError (see above for traceback): Cannot assign a device for operation 'optimizer/dense_1/bias/RMSProp_1': Operation was explicitly assigned to /job:ps/task:0 but available devices are [ /job:localhost/replica:0/task:0/device:CPU:0 ]. Make sure the device specification refers to a valid device.
	 [[Node: optimizer/dense_1/bias/RMSProp_1 = VariableV2[_class=["loc:@dense_1/bias"], container="", dtype=DT_FLOAT, shape=[5], shared_name="", _device="/job:ps/task:0"]()]]

When config= tf.ConfigProto(allow_soft_placement=True, log_device_placement=True) :

optimizer/RMSProp/value: (Const): /job:ps/replica:0/task:0/device:CPU:0
2018-11-12 15:50:32.823393: I tensorflow/core/common_runtime/placer.cc:875] optimizer/RMSProp/value: (Const)/job:ps/replica:0/task:0/device:CPU:0
global_step/Initializer/Const: (Const): /job:ps/replica:0/task:0/device:CPU:0

Thanks for the help! :slight_smile: