Parallel k-fold cross validation of edward model with python multiprocessing


#1

I am looking to implement a k-fold cross validation in parallel using the multiprocessor module in python. A toy implementation is below

import numpy as np
np.random.seed(0)
from joblib import Parallel, delayed    
import edward as ed        
import tensorflow as tf
tf.set_random_seed(0)
from edward.models import Normal, Empirical
ed.set_seed(42)

def CV_split(splits):
 
    sess = tf.InteractiveSession()
    with sess.as_default():  
        def build_toy_dataset(N, noise_std=0.5):
          X = np.concatenate([np.linspace(0, 2, num=N / 2),
                              np.linspace(6, 8, num=N / 2)])
          #X = np.random.normal(loc=5,scale=1,size=N)
          y = 2.0 * X + 10 + np.random.normal(0, noise_std, size=N)
          X = X.reshape((N, 1))
          return X, y
        
        N = 1000  # number of data points
        D = 1  # number of features
        
        # DATA
        X_train, y_train = build_toy_dataset(N)
        X_test, y_test = build_toy_dataset(N)
        
        # MODEL
        X = tf.placeholder(tf.float32, [N, D])
        w = Normal(loc=tf.zeros(D), scale=tf.ones(D))
        b = Normal(loc=tf.zeros(1), scale=tf.ones(1))
        y = Normal(loc=ed.dot(X, w) + b, scale=tf.ones(N))
        
        # INFERENCE
        T = 500                        # Number of samples.
        nburn = 100                     # Number of burn-in samples.
        stride = 10                    # Frequency with which to plot samples.
        qw = Empirical(params=tf.Variable(tf.random_normal([T, D])))
        qb = Empirical(params=tf.Variable(tf.random_normal([T, 1])))
        
        inference = ed.HMC({w: qw, b: qb}, data={X: X_train, y: y_train})
        inference.run(step_size=1e-3)
        
        #sess = ed.get_session()
        mean_qw, mean_qb = sess.run([qw.mean(),qb.mean()])
            
        print("Inferred posterior mean:")
        print(mean_qw, mean_qb)

        
    return mean_qw
    

val = np.arange(0,2,1)
results = Parallel(n_jobs=2, verbose=1, backend="threading")(
                 map(delayed(CV_split), val))        
Y_plot = np.concatenate(results,axis=0 ) 

the model in the CV_Split function is the Bayesian linear regression example with same data but the intent is to use different folds of data evaluated separately on each process. The current model gives the following error

TypeError: Cannot interpret feed_dict key as Tensor: Tensor Tensor("Placeholder:0", shape=(1000, 1), dtype=float32) is not an element of this graph.

Which I assume is due to both processes tying to access the same session. Does anyone have pointers on how to declare a separate session and graph for each process inside the function.


#2

Have you tried defining separate graphs as described in https://www.tensorflow.org/api_docs/python/tf/Graph together with the separate sessions?


#3

Yes (code below), still getting the same error message.

def CV_split(splits):
    
    with tf.Graph().as_default() as g:
        
        def build_toy_dataset(N, noise_std=0.5):
          X = np.concatenate([np.linspace(0, 2, num=N / 2),
                              np.linspace(6, 8, num=N / 2)])
          #X = np.random.normal(loc=5,scale=1,size=N)
          y = 2.0 * X + 10 + np.random.normal(0, noise_std, size=N)
          X = X.reshape((N, 1))
          return X, y        
                
        N = 1000  # number of data points
        D = 1  # number of features
        
        # DATA
        X_train, y_train = build_toy_dataset(N)
        X_test, y_test = build_toy_dataset(N)
        
        # MODEL
        X = tf.placeholder(tf.float32, [N, D])
        w = Normal(loc=tf.zeros(D), scale=tf.ones(D))
        b = Normal(loc=tf.zeros(1), scale=tf.ones(1))
        y = Normal(loc=ed.dot(X, w) + b, scale=tf.ones(N))
        
        # INFERENCE
        T = 500                        # Number of samples.
        nburn = 100                     # Number of burn-in samples.
        stride = 10                    # Frequency with which to plot samples.
        qw = Empirical(params=tf.Variable(tf.random_normal([T, D])))
        qb = Empirical(params=tf.Variable(tf.random_normal([T, 1])))
        
        print (g)
        print (T)

        sess = tf.InteractiveSession(graph=g)
        with sess.as_default(): 
            
            inference = ed.HMC({w: qw, b: qb}, data={X: X_train, y: y_train})
            inference.run(step_size=1e-3)
            
            #sess = ed.get_session()
            mean_qw, mean_qb = sess.run([qw.mean(),qb.mean()])
                
            print("Inferred posterior mean:")
            print(mean_qw, mean_qb)

        
    return mean_qw
    
val = np.arange(0,2,1)
results = Parallel(n_jobs=2, verbose=1, backend="threading")(
                 map(delayed(CV_split), val))        
Y_plot = np.concatenate(results,axis=0 )