# Copyright 2016 Hewlett Packard Enterprise Development LP
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for
# the specific language governing permissions and limitations under the License.
from __future__ import absolute_import
import ctypes
import hashlib
import os
import subprocess
import string
import re
from collections import namedtuple
import opcode
import inspect
from dis import findlinestarts
import six
import numpy as np
from numpy.ctypeslib import ndpointer
from .expression import TensorType, ExpressionDAG, input, OutputTensor
from .local import version, cache_directory, cuda_enabled, cuda_directory, logger, cxx
from . import language_pb2 as lang
_default_cuda_threads_per_block = 32
class _DynamicLibOp(object):
_loaded_module = None
_shape_infernence_registered = False
_gradient_registered = False
@staticmethod
def module():
import tensorflow as tf
if _DynamicLibOp._loaded_module is None:
libname = 'dynamiclibop.so.' + version
dynamiclibop_path = os.path.join(cache_directory, libname)
# build the library if it does not exist already
if not os.path.exists(dynamiclibop_path):
tf_include = tf.sysconfig.get_include()
# resolve the directory of this file
this_file_path = os.path.abspath(__file__)
this_directory = os.path.split(this_file_path)[0]
try:
if cuda_enabled:
logger.info('building dynamiclibop for GPU')
subprocess.check_output([cxx, '-fPIC', '-Wall', '-shared',
'-std=c++11', '-O2', '-Wextra', '-DGOOGLE_CUDA=1',
'-o', dynamiclibop_path,
this_directory + '/dynamiclibop.cc',
'-isystem', cuda_directory + '/include',
'-isystem', tf_include],
stderr=subprocess.STDOUT,
universal_newlines=True)
else:
logger.info('building dynamiclibop for CPU')
subprocess.check_output([cxx, '-fPIC', '-Wall', '-shared',
'-std=c++11', '-O2', '-Wextra',
'-o', dynamiclibop_path,
this_directory + '/dynamiclibop.cc',
'-isystem', tf_include],
stderr=subprocess.STDOUT,
universal_newlines=True)
except subprocess.CalledProcessError as exception:
logger.error('c++ compiler error: ' + exception.output)
raise
_DynamicLibOp._loaded_module = tf.load_op_library(dynamiclibop_path)
if _DynamicLibOp._shape_infernence_registered is False:
_DynamicLibOp._shape_infernence_registered = True
@tf.RegisterShape("DynamicLib")
def get_out_shapes(op):
return op.get_attr('out_shapes')
if not _DynamicLibOp._gradient_registered:
from tensorflow.python.framework import ops as tf_ops
@tf_ops.RegisterGradient("DynamicLib")
def _dynamic_lib_grad(op, *grads):
if op.get_attr('serialized_grad_dag') == '':
return [None]*len(op.inputs)
grad_dag = lang.OperatorDAG()
grad_dag.ParseFromString(op.get_attr('serialized_grad_dag'))
try:
len(grads)
except TypeError:
grad_list = [grads]
else:
grad_list = list(grads)
grad_inputs = []
for op_input in op.inputs:
grad_inputs.append(op_input)
for grad in grad_list:
grad_inputs.append(grad)
grad_of_grad_dags = [None]*len(grad_dag.operators)
# make sure that the input types and expected types are consistent
for grad_input_index, grad_input in enumerate(grad_inputs):
received_type = TensorType.like(grad_input).as_proto()
expected_type = grad_dag.dag_input_types[grad_input_index]
if received_type != expected_type:
raise TypeError('Received a tensor of type: ' + str(received_type) +
', but expected a type: ' + str(expected_type) +
' at gradient input index: ' + str(grad_input_index))
return _dag_to_tf(grad_dag, grad_inputs, grad_of_grad_dags)
_DynamicLibOp._gradient_registered = True
return _DynamicLibOp._loaded_module
class _TensorParam(ctypes.Structure):
_fields_ = [("data", ctypes.c_void_p),
("dtype", ctypes.c_int),
("len", ctypes.c_size_t)]
class OperatorOutput(object):
"""
Class which represents an un-evaluated output tensor, used for building lazily evaluated DAGs of operators
"""
# raise operator priority so that numpy does not try to use its own operator
__array_priority__ = 1
_builtin_ops = {}
@staticmethod
def register_magic_method(key, value):
OperatorOutput._builtin_ops[key] = value
def __init__(self, parent, index):
if not isinstance(parent, _Operator):
raise TypeError('parent must be an Operator')
if not isinstance(index, int):
raise TypeError('index must be an int')
self.parent = parent
self.index = index
self.tensor_type = parent.output_types[index]
self.shape = parent.output_types[index].shape
self.dtype = parent.output_types[index].dtype
self.size = self.tensor_type.size
def __add__(self, other):
return OperatorOutput._builtin_ops['add'](self, other)
def __radd__(self, other):
return OperatorOutput._builtin_ops['add'](other, self)
def __sub__(self, other):
return OperatorOutput._builtin_ops['sub'](self, other)
def __rsub__(self, other):
return OperatorOutput._builtin_ops['sub'](other, self)
def __mul__(self, other):
return OperatorOutput._builtin_ops['mul'](self, other)
def __rmul__(self, other):
return OperatorOutput._builtin_ops['mul'](other, self)
# python 2
def __div__(self, other):
return OperatorOutput._builtin_ops['div'](self, other)
def __rdiv__(self, other):
return OperatorOutput._builtin_ops['div'](other, self)
# python 3
def __truediv__(self, other):
return OperatorOutput._builtin_ops['div'](self, other)
def __rtruediv__(self, other):
return OperatorOutput._builtin_ops['div'](other, self)
def __mod__(self, other):
return OperatorOutput._builtin_ops['mod'](self, other)
def __rmod__(self, other):
return OperatorOutput._builtin_ops['mod'](other, self)
def __eq__(self, other):
return OperatorOutput._builtin_ops['eq'](self, other)
def __ne__(self, other):
return OperatorOutput._builtin_ops['ne'](self, other)
def __lt__(self, other):
return OperatorOutput._builtin_ops['lt'](self, other)
def __le__(self, other):
return OperatorOutput._builtin_ops['le'](self, other)
def __gt__(self, other):
return OperatorOutput._builtin_ops['gt'](self, other)
def __ge__(self, other):
return OperatorOutput._builtin_ops['ge'](self, other)
def __neg__(self):
return OperatorOutput._builtin_ops['neg'](self)
@staticmethod
def __bool__(self):
raise SyntaxError('Cannot resolve operator values at interpretation time.')
class _GradientPlaceholder(object):
def __init__(self, shape, dtype):
self.tensor_type = TensorType(shape, dtype)
self.rank = self.tensor_type.rank
self.shape = shape
self.dtype = dtype
class _Operator(object):
"""
Class which is extended to define a new operator and its gradient.
"""
# raise operator priority so that numpy does not try to use its own operator
__array_priority__ = 1
def __init__(self, dag, output_types, inputs, grad_dag, from_gradient, name):
self.inputs = inputs
self.expression_dag = dag
self.output_types = output_types
self.name = name
self.grad_dag = grad_dag
self.from_gradient = from_gradient
logger.debug('Operator created: ' + str(dag.name))
def __getitem__(self, item):
return OperatorOutput(self, item)
def check_binary(self):
if len(self.output_types) != 1:
raise SyntaxError('Cannot use binary infix operators on multi-output operators. '
'Explicitly index an output.')
def __add__(self, other):
self.check_binary()
return self[0] + other
def __radd__(self, other):
self.check_binary()
return other + self[0]
def __sub__(self, other):
self.check_binary()
return self[0] - other
def __rsub__(self, other):
self.check_binary()
return other - self[0]
def __mul__(self, other):
self.check_binary()
return self[0] * other
def __rmul__(self, other):
self.check_binary()
return other * self[0]
# python 2
def __div__(self, other):
self.check_binary()
return self[0] / other
def __rdiv__(self, other):
self.check_binary()
return other / self[0]
# python 3
def __truediv__(self, other):
self.check_binary()
return self[0] / other
def __rtruediv__(self, other):
self.check_binary()
return other / self[0]
def __mod__(self, other):
self.check_binary()
return self[0] % other
def __rmod__(self, other):
self.check_binary()
return other % self[0]
def __eq__(self, other):
self.check_binary()
return self[0] == other
def __ne__(self, other):
self.check_binary()
return self[0] != other
def __lt__(self, other):
self.check_binary()
return self[0] < other
def __le__(self, other):
self.check_binary()
return self[0] <= other
def __gt__(self, other):
self.check_binary()
return self[0] > other
def __ge__(self, other):
self.check_binary()
return self[0] >= other
def __neg__(self):
self.check_binary()
return -self[0]
def _resolve_output(x):
"""
Resolve whether or not an object is an OperatorOutput. Converts single-output Operators to an OperatorOutput
:param x: the argument to resolve
:return: the argument converted into an _OperatorOutput
:raises ValueError: When argument is a multi-output Operator
:raises TypeError: When argument is neither an Operator nor an _OperatorOutput
"""
if isinstance(x, _Operator):
if len(x.output_types) is not 1:
raise ValueError('Only a single-output Operator can be used as an input to another operator. '
'Index a specific output from multi-output Operators.')
return x[0]
elif not isinstance(x, OperatorOutput):
raise TypeError('Only operator outputs can be used to build an op dag. Received a ' + str(type(x)))
else:
return x
def _op_hash(op):
return 'f' + hashlib.sha224(op.SerializeToString() + version.encode('utf-8')).hexdigest()
class _OpGenerator(object):
def __init__(self, op_function, forbid_none_valued_constants, name):
self.op_function = op_function
self.forbid_none_valued_constants = forbid_none_valued_constants
# name this op based on it's function name unless a name is supplied to the operator decorator
if name is None:
self.name = self.op_function.__name__
else:
self.name = self.name
self.grad_function = None
def __doc__(self):
return self.op_function.__doc__
def __str__(self):
return self.op_function.__name__
def __call__(self, *inputs, **defined_constants):
# Determine if this function is part of a gradient graph. If so, do not generate a gradient DAG.
from_gradient = False
for inp in inputs:
try:
if inp.from_gradient:
from_gradient = True
except AttributeError:
pass
if isinstance(inp, _GradientPlaceholder):
from_gradient = True
func_args, func_varargs, func_keywords, func_defaults = inspect.getargspec(self.op_function)
if func_defaults is None:
input_names = func_args
constants = defined_constants
else:
input_names = func_args[:-len(func_defaults)]
constants = dict(zip(func_args[-len(func_defaults):], func_defaults))
constants.update(defined_constants)
if self.forbid_none_valued_constants:
for key in constants.keys():
if constants[key] is None:
raise ValueError(self.name + ' argument ' + key + ' is None, which implies an unset constant.\n'
' If a None constant is meaningful for this operator, the operator should be '
'defined with the appropriate decorator flag.')
input_types = []
for inp_n, inp in enumerate(inputs):
try:
inp = _resolve_output(inp)
except TypeError:
pass
try:
input_types.append(TensorType.like(inp))
except AttributeError:
raise TypeError('Unexpectedly received a ' + inp.__class__.__name__ +
' instead of a tensor at argument position ' +
str(inp_n + 1) + ' in the call to ' + self.name + '. '
'Should this argument be passed as a constant (keyword argument) instead?')
num_inputs = len(input_types)
if len(input_names) != num_inputs and func_varargs is None:
err_msg = '\n'
err_msg += self.name + ' function signature expects ' + str(len(input_names)) + \
' input tensor argument(s):\n' + str(input_names) + '\n'
err_msg += 'but was supplied with ' + str(num_inputs) + '.\n'
if len(input_names) > num_inputs:
remaining_names = input_names[num_inputs - len(input_names):]
err_msg += 'Should ' + str(remaining_names) + ' be passed to constructor as constant?'
raise TypeError(err_msg)
ExpressionDAG.clear()
# create input expressions
input_exprs = []
for cur_type in input_types:
input_exprs.append(input(TensorType.like(cur_type)))
# interpret function to build up ExpressionDAG
output_exprs = self.op_function(*input_exprs, **constants)
if output_exprs is None:
raise ValueError('No outputs returned from op function')
# wrap as list if only one output
try:
len(output_exprs)
except TypeError:
output_exprs = [output_exprs]
# make sure number of returned parameters equals the number of declared outputs
if len(output_exprs) != ExpressionDAG.num_outputs:
raise ValueError('Defined ' + str(ExpressionDAG.num_outputs) + ' outputs, but returned ' +
str(len(output_exprs)) +
'. Number of defined outputs must equal number of returned outputs.')
# make sure all returned values are output expressions
# reorder output io_index according to return order instead of declaration order
output_types = []
prev_index = []
for index, expr in enumerate(output_exprs):
if type(expr) is not OutputTensor:
raise TypeError('User functions must only return outputs. Instead got:\n' + str(expr))
prev_index.append(ExpressionDAG.expr_index(expr))
expr.proto_expr.io_index = index
output_types.append(TensorType.like(expr))
# reorder declaration of outputs in expression dag
prev_index.sort()
for index, expr in zip(prev_index, output_exprs):
ExpressionDAG.exprs[index] = expr
ExpressionDAG.expr_ids[index] = id(expr)
expression_dag = ExpressionDAG.as_proto()
ExpressionDAG.clear()
expression_dag.name = self.name
if self.grad_function is None or from_gradient:
reordered_dag = None
else:
grad_inputs = []
for t in input_types:
grad_inputs.append(_GradientPlaceholder(t.shape, t.dtype))
for t in output_types:
grad_inputs.append(_GradientPlaceholder(t.shape, t.dtype))
# interpret gradient function
grad_outputs = self.grad_function(*grad_inputs, **constants)
# wrap as list if only one output
try:
len(grad_outputs)
except TypeError:
grad_outputs = [grad_outputs]
# If any gradient function outputs are None, do not create a grad dag
# TODO: allow for incomplete derivative definition
has_none_grads = False
for grad_output in grad_outputs:
if grad_output is None:
has_none_grads = True
if has_none_grads:
reordered_dag = None
else:
# make sure grad outputs are the same type as op inputs
for grad_output_index, grad_output in enumerate(grad_outputs):
if isinstance(grad_output, _Operator) and len(grad_output.output_types) != 1:
raise TypeError('A multi-output operator was returned from a gradient function, '
'but the meaning of this is ambiguous: explicitly index each '
'output from operator: ' +
str(grad_output.name))
cur_input_type = input_types[grad_output_index]
cur_grad_output_type = TensorType.like(_resolve_output(grad_output))
if cur_input_type != cur_grad_output_type:
raise TypeError('Gradient output index ' + str(grad_output_index) + ', with TensorType: ' +
str(cur_grad_output_type) + ' is inconsistent with operator input index ' +
str(grad_output_index) + ', with TensorType: ' + str(cur_input_type))
grad_dag = _build_op_dag(*grad_outputs)
# grad dag may not be dependent on all possible gradient inputs, in which case _build_op_dag
# will not find all possible _GradientPlaceholders in grad_inputs. It may also find the inputs
# in a different order than grad_inputs is arranged. In both of these cases the gradient dag
# should have all possible inputs inserted and re-arranged so that they reflect the length
# and order of grad_inputs. All downstream functions that use the grad dag assume that all
# possible inputs are populated in that order.
found_inputs = grad_dag.inputs
input_remap = {}
for found_index, found_input in enumerate(found_inputs):
grad_index = grad_inputs.index(found_input)
input_remap[found_index] = grad_index
# copy the found proto dag
reordered_dag = lang.OperatorDAG()
reordered_dag.CopyFrom(grad_dag.proto_dag)
# clear the input types and put in the correct full signature of gradient inputs
del reordered_dag.dag_input_types[:]
for grad_input in grad_inputs:
cur_type = grad_input.tensor_type.as_proto()
reordered_dag.dag_input_types.add().CopyFrom(cur_type)
# update the input references to update dag_input_index values
for ref in reordered_dag.references:
for input_ref in ref.input_refs:
if input_ref.is_leaf:
input_ref.dag_input_index = input_remap[input_ref.dag_input_index]
return _Operator(expression_dag, output_types, inputs, reordered_dag, from_gradient, self.name)
def add_grad(self, grad_function):
if self.grad_function is None:
self.grad_function = grad_function
else:
raise ValueError('Gradient function is already defined for operator ' + str(self.name) + '.')
[docs]def operator(forbid_none_valued_constants=True, name=None):
def wrapper(op_function):
# Use disassembler to check for reassignments to variables inside the op function
# define compatibility function for python 2 and 3 bytecodes
def bytecode_to_int(x):
if six.PY2:
return ord(x)
elif six.PY3:
return x
co = op_function.__code__
code = co.co_code
linestarts = dict(findlinestarts(co))
extended_arg = 0
extended_argj = 0
linestart_opcode_n = 0
linestart = 0
op_last = 0
variable_names = set()
# step through op codes
cur_opcode_n = 0
while cur_opcode_n < len(code):
is_assignment = False
is_variable = False
has_lshift = False
cur_opcode = bytecode_to_int(code[cur_opcode_n])
if cur_opcode_n in linestarts:
linestart_opcode_n = cur_opcode_n
linestart = linestarts[cur_opcode_n]
if cur_opcode == opcode.opmap['STORE_FAST']:
is_assignment = True
prev_opcode_n = linestart_opcode_n
while prev_opcode_n < cur_opcode_n and not is_variable:
prev_opcode = bytecode_to_int(code[prev_opcode_n])
prev_opcode_n += 1
if prev_opcode >= opcode.HAVE_ARGUMENT:
opargj = bytecode_to_int(code[prev_opcode_n]) + \
bytecode_to_int(code[prev_opcode_n+1])*256 + extended_argj
extended_argj = 0
prev_opcode_n += 2
if prev_opcode == opcode.EXTENDED_ARG:
extended_argj = opargj*65536
elif prev_opcode == opcode.opmap['LOAD_ATTR'] or prev_opcode == opcode.opmap['LOAD_GLOBAL']:
hasname = prev_opcode in opcode.hasname
named_var = co.co_names[opargj] == 'variable'
is_variable = hasname and named_var
# check to see if prior op code is lshift
has_lshift = op_last == opcode.opmap['INPLACE_LSHIFT']
cur_opcode_n += 1
if cur_opcode >= opcode.HAVE_ARGUMENT:
oparg = bytecode_to_int(code[cur_opcode_n]) + bytecode_to_int(code[cur_opcode_n+1])*256 + extended_arg
extended_arg = 0
cur_opcode_n += 2
if cur_opcode == opcode.EXTENDED_ARG:
extended_arg = oparg*65536
elif cur_opcode in opcode.haslocal:
variable_name = co.co_varnames[oparg]
func_name = co.co_name
if is_assignment:
if variable_name in variable_names and not has_lshift:
s = ' File "' + co.co_filename + '", line ' + str(linestart)
raise SyntaxError('Cannot reassign to symbol "' + variable_name + '" in operator "' +
func_name + '" because it refers to an OVL variable. Use the <<= operator'
' to assign to OVL variables. \n' + s)
elif is_variable:
variable_names.add(variable_name)
op_last = cur_opcode
op = _OpGenerator(op_function, forbid_none_valued_constants, name)
op.__name__ = op_function.__name__
op.__doc__ = op_function.__doc__
return op
return wrapper
[docs]def gradient(op_function):
if not isinstance(op_function, _OpGenerator):
raise TypeError('gradient decorator argument must be a function decorated as an operator')
def wrapper(grad_function):
if isinstance(grad_function, _OpGenerator):
def resolved(*args, **defaults):
op = grad_function(*args, **defaults)
op_outputs = []
for output_index in range(len(op.output_types)):
op_outputs.append(op[output_index])
return op_outputs
else:
resolved = grad_function
op_function.add_grad(resolved)
resolved.__name__ = grad_function.__name__
resolved.__doc__ = grad_function.__doc__
return resolved
return wrapper
_OperatorDAG = namedtuple('_OperatorDAG', ['proto_dag', 'inputs', 'operators', 'grad_dags'])
def _build_op_dag(*outputs):
"""
Perform BFS on the op nodes
:param outputs: a list of the operator outputs from which to build the dag
:return: an _OpDAG
"""
ops = []
op_ids = []
op_depth = []
input_indices = []
dag_inputs = []
dag_input_ids = []
def traverse(cur_node):
if not isinstance(cur_node, _Operator):
raise TypeError()
cur_id = id(cur_node)
# add unvisited ops (nodes) to the op list
if cur_id not in op_ids:
op_ids.append(cur_id)
ops.append(cur_node)
op_depth.append(None)
input_indices.append(None)
traverse_cur_index = len(op_ids) - 1
# tabulate each input tensor (edge) for this op. visit parent ops if inputs come from other ops.
traverse_cur_input_indices = []
max_depth = -1
for cur_input in cur_node.inputs:
try:
resolved = _resolve_output(cur_input)
parent = resolved.parent
traverse_parent_id, traverse_parent_depth = traverse(parent)
max_depth = max(max_depth, traverse_parent_depth)
traverse_parent_index = op_ids.index(traverse_parent_id)
traverse_output_index = resolved.index
traverse_dag_input_index = None
except TypeError:
if id(cur_input) not in dag_input_ids:
dag_inputs.append(cur_input)
dag_input_ids.append(id(cur_input))
traverse_parent_index = None
traverse_output_index = None
traverse_dag_input_index = dag_input_ids.index(id(cur_input))
traverse_cur_input_indices.append({'parent_index': traverse_parent_index,
'output_index': traverse_output_index,
'dag_input_index': traverse_dag_input_index})
input_indices[traverse_cur_index] = traverse_cur_input_indices
cur_depth = max_depth + 1
op_depth[traverse_cur_index] = cur_depth
else:
traverse_cur_index = op_ids.index(cur_id)
cur_depth = op_depth[traverse_cur_index]
return cur_id, cur_depth
output_indices = []
for dag_output in outputs:
cur_output = _resolve_output(dag_output)
parent_id, parent_depth = traverse(cur_output.parent)
parent_index = op_ids.index(parent_id)
output_index = cur_output.index
output_indices.append({'parent_index': parent_index, 'output_index': output_index})
# sort ops according to their DAG depth
new_to_old = np.argsort(op_depth)
old_to_new = np.argsort(new_to_old)
sorted_ops = []
sorted_input_indices = []
for new_index, old_index in enumerate(new_to_old):
sorted_ops.append(ops[old_index])
sorted_input_indices.append([])
cur_input_indices = sorted_input_indices[-1]
for cur_index in input_indices[old_index]:
parent_index = cur_index['parent_index']
output_index = cur_index['output_index']
dag_input_index = cur_index['dag_input_index']
if parent_index is not None:
parent_index = old_to_new[parent_index]
cur_input_indices.append({'parent_index': parent_index,
'output_index': output_index,
'dag_input_index': dag_input_index})
# update parent indices for the outputs
for output_index in output_indices:
output_index['parent_index'] = old_to_new[output_index['parent_index']]
# create the protobuf representation
op_dag = lang.OperatorDAG()
for op in sorted_ops:
op_dag.operators.add().CopyFrom(op.expression_dag)
for dag_input in dag_inputs:
proto_type = TensorType.like(dag_input).as_proto()
op_dag.dag_input_types.add().CopyFrom(proto_type)
for input_index in sorted_input_indices:
ref_list = lang.OperatorDAG.OperatorInputReferences()
for cur_ref in input_index:
proto_ref = lang.OperatorDAG.OperatorInputReference()
if cur_ref['parent_index'] is None:
proto_ref.is_leaf = True
proto_ref.dag_input_index = cur_ref['dag_input_index']
else:
proto_ref.op_index = int(cur_ref['parent_index'])
proto_ref.op_output_index = int(cur_ref['output_index'])
ref_list.input_refs.add().CopyFrom(proto_ref)
op_dag.references.add().CopyFrom(ref_list)
for output_index in output_indices:
proto_ref = lang.OperatorDAG.DAGOutputReference()
proto_ref.op_index = int(output_index['parent_index'])
proto_ref.op_output_index = int(output_index['output_index'])
op_dag.dag_outputs.add().CopyFrom(proto_ref)
grad_dags = []
for op in sorted_ops:
grad_dags.append(op.grad_dag)
dag = _OperatorDAG(proto_dag=op_dag, inputs=dag_inputs, operators=sorted_ops, grad_dags=grad_dags)
return dag
def _get_expr_indices(expr_dag, expr_code):
"""
Get indices of all read/write expression in the expression dag which is represented as a list.
:param expr_dag: The expression dag to search for read/write expressions.
:param expr_code: The expression code.
:return: A list of indices with all expressions that match the expression code.
"""
expr_indices = []
for iExp, expr in enumerate(expr_dag.expressions):
if expr.code == expr_code:
expr_indices.append(iExp)
return expr_indices
def _get_output_indices(expr_dag):
"""
Find indices of expressions that are outputs in the expression dag.
:param expr_dag: expression dag.
:return: a list of output indices.
"""
return _get_expr_indices(expr_dag, lang.OUTPUT)
def _get_input_indices(expr_dag):
"""
Find indices of expression that are inputs in the expression dag.
:param expr_dag: the expression dag.
:return: a list of input indices.
"""
return _get_expr_indices(expr_dag, lang.INPUT)
def _get_position_index(expr_dag):
"""
Returns the position index within this expr_dag. Checks that there is one and only one POSITION expression.
:param expr_dag: The expression dag.
:return: The index of the position within the expression dag.
"""
pos_indices = _get_expr_indices(expr_dag, lang.POSITION)
assert len(pos_indices) == 1 # There is only one position definition
return pos_indices[0]
def _get_output_io_index(expr_dag, out_expr_index):
"""
Assumes that OUTPUT expressions appear in return order.
:param expr_dag: The expression dag.
:param out_expr_index: The index of the output in the expression dag.
:return: The output argument index. Returns -1 if iOut is not an output and there are no outputs before.
"""
assert expr_dag.expressions[out_expr_index].code == lang.OUTPUT
io_index = -1
for i in range(out_expr_index, 0, -1):
if expr_dag.expressions[i].code == lang.OUTPUT:
io_index += 1
return io_index
def _get_output_shape(expr_dag, out_arg_index):
"""
Returns the shape of the 'out_arg_index'-th output in the expression dag.
:param expr_dag: expression dag
:param out_arg_index: output index. Must be >=0 and < number of outputs.
:return: shape of the output.
"""
outs = _get_output_indices(expr_dag)
assert out_arg_index < len(outs)
return expr_dag.expressions[outs[out_arg_index]].tensor_type.shape
def _get_tensor_read_indices(expr_dag):
"""
Get indices for the READ_TENSOR expression code.
:param expr_dag: The expression dag.
:return: A list of indices with a READ_TENSOR expression.
"""
return _get_expr_indices(expr_dag, lang.READ_TENSOR)
def _get_tensor_assign_indices(expr_dag):
"""
Get indices for the ASSIGN_TENSOR expression code.
:param expr_dag: The expression dag.
:return: A list of indices with ASSIGN_TENSOR expressions.
"""
return _get_expr_indices(expr_dag, lang.ASSIGN_TENSOR)
def _get_indices_connected_to_expr_index(exp_dag, expr_indices, expr_index):
"""
Filter 'expr_indices' if they can be traced back to 'expr_index'. Typical use cases are:
Does a READ_TENSOR expression have connection to an input or output tensor?
Does an ASSIGN_TENSOR expression have connection to an output tensor?
:param exp_dag: The expression dag.
:param expr_indices: Expression indices to be tested if they have a connection to expr_index.
:param expr_index: An expr_index in the dag.
:return: A sub-list of 'expr_indices' that contains only those indices of expressions that are connected to the
expression at the index 'expr_index'. Returned indices may not be in the same order than indices in 'expr_indices'.
"""
connected_indices = set()
indices = []
for index in expr_indices:
indices.append(index)
while len(indices) > 0:
i = indices.pop(0)
for op_index in exp_dag.references[i].operand_indices:
if op_index == expr_index:
connected_indices.add(index)
# We can stop here, because we traced the op_index back to expr_index.
indices[:] = []
break
else:
indices.append(op_index)
return list(connected_indices)
def _get_indices_connected_to_expr_indices(exp_dag, from_expr_indices, to_expr_indices):
"""
Filters the from_expr_indices down toward indices that are connected to any index within to_expr_indices.
:param exp_dag: The expression dag.
:param from_expr_indices: Test these indices for a connection with any of the to_expr_indices.
:param to_expr_indices: A set of expression indices.
:return: A set with all indices of from_expr_indices that are connected. This is a subset of from_expr_indices or
equal to that set.
"""
connected_indices = set()
indices = []
for index in from_expr_indices:
indices.append(index)
while len(indices) > 0:
i = indices.pop(0)
for op_index in exp_dag.references[i].operand_indices:
if op_index in to_expr_indices:
connected_indices.add(index)
# We can stop here, because we traced the op_index back to expr_index.
indices[:] = []
break
else:
indices.append(op_index)
return list(connected_indices)
def _get_tensor_read_indices_for_expr_index(expr_dag, expr_index):
"""
Returns a list of all TENSOR_READs for the expr_index, which usually is the index of an input/output tensor. An
input tensor can only be read but not assigned to in ovl.
:param expr_dag: The expression dag.
:param expr_index: Usually the index in the expression dag of an input/output tensor.
:return: A list of all expression indices that are TENSOR_READs and are connected to expr_index.
"""
read_indices = _get_tensor_read_indices(expr_dag)
return _get_indices_connected_to_expr_index(expr_dag, read_indices, expr_index)
def _get_indices_of_sub_expr_wout_position(expr_dag, expr_index):
"""
Returns a list of all indices traversing the expression dag from expr_index to either an expression without inputs
or a POSITION expression. The index for the POSITION expression is not included in the returned set.
:param expr_dag: The expression dag.
:param expr_index: The start index for the traversal toward inputs in the expression dag.
:return: A set with all indices of the traversal.
"""
sub_expr_indices = set()
indices = [expr_index]
while len(indices) > 0:
i = indices.pop(0)
# Stop at position.
if expr_dag.expressions[i].code == lang.POSITION:
continue
if i not in sub_expr_indices:
sub_expr_indices.add(i)
indices.extend(expr_dag.references[i].operand_indices)
return sub_expr_indices
def _get_indices_of_sub_expr(expr_dag, expr_index):
"""
Get all indices in expression dag starting from 'expr_index' following input references.
:param expr_dag: The expression dag.
:param expr_index: Start the traversal toward inputs from here.
:return: A set with all indices of the traversal starting from expr_index.
"""
sub_expr_indices = set()
indices = [expr_index]
while len(indices) > 0:
i = indices.pop(0)
if i not in sub_expr_indices:
sub_expr_indices.add(i)
indices.extend(expr_dag.references[i].operand_indices)
return sub_expr_indices
def _get_tensor_assign_indices_for_expr_index(expr_dag, expr_index):
"""
Returns a list of all TENSOR_ASSIGNs for the expression dag, which usually an assign to an OUTPUT tensor.
One output can have multiple writes but never a read. That is not allowed in ovl.
:param expr_dag: The expression dag.
:param expr_index: Usually the index of an OUTPUT tensor.
:return: A list of all expression indices that are TENSOR_ASSIGN's and are connected to 'expr_index'.
"""
write_indices = _get_tensor_assign_indices(expr_dag)
return _get_indices_connected_to_expr_index(expr_dag, write_indices, expr_index)
def _match_index_in_expr_dags(expr_dag0, expr_index0, expr_dag1, expr_index1):
"""
Compare the indexing by matching the sub-expression trees in 'expr_dag0' and 'expr_dag1'. This resolves complex
indexing including mod, shift, etc.
:param expr_dag0: The first expression dag.
:param expr_index0: The index that refers to READ_TENSOR or ASSIGN_TENSOR in the first expression dag.
:param expr_dag1: The second expression dag.
:param expr_index1: The index that refers to READ_TENSOR or ASSIGN_TENSOR in the second expression dag.
:return: True if the codes of all expressions match, otherwise False.
"""
op_indices0 = expr_dag0.references[expr_index0].operand_indices
op_indices1 = expr_dag1.references[expr_index1].operand_indices
# Must have position 1 in operand_indices.
assert len(op_indices0) > 1
assert len(op_indices1) > 1
index0 = op_indices0[1]
index1 = op_indices1[1]
# Initialize the indices with the index expression in both sub-trees.
indices = [(index0, index1)]
while len(indices) > 0:
multi_index = indices.pop(0)
index0 = multi_index[0]
index1 = multi_index[1]
expr0 = expr_dag0.expressions[index0]
expr1 = expr_dag1.expressions[index1]
# Check for the same expression code.
if expr0.code != expr1.code:
return False
# For constants check that the have the same value.
if expr0.code == lang.CONST_SCALAR:
dtype0 = expr0.dtype
dtype1 = expr1.dtype
if dtype0 != dtype1:
return False
if dtype0 == lang.UINT64:
if expr0.uint64_data != expr1.uint64_data:
return False
elif dtype0 == lang.INT64:
if expr0.sint64_data != expr1.sint64_data:
return False
elif dtype0 == lang.FLOAT64:
if expr0.double_data != expr1.double_data:
return False
elif dtype0 == lang.FLOAT32:
if expr0.float_data != expr1.float_data:
return False
elif dtype0 == lang.UINT32 or dtype0 == lang.UINT16 or dtype0 == lang.UINT8:
if expr0.uint32_data != expr1.uint32_data:
return False
elif dtype0 == lang.INT32 or dtype0 == lang.INT16 or dtype0 == lang.INT8:
if expr0.sint32_data != expr1.sint32_data:
return False
else:
raise TypeError('Cannot match unknown data type'+dtype0)
# Continue with the expressions of this expression for each dag toward inputs.
op_indices0 = expr_dag0.references[index0].operand_indices
op_indices1 = expr_dag1.references[index1].operand_indices
# The number of referenced inputs must match.
if len(op_indices0) != len(op_indices1):
return False
# Add all inputs to the queue of to be processed expressions.
for i in range(len(op_indices0)):
indices.append((op_indices0[i], op_indices1[i]))
return True
def _eliminate_duplicates(l):
"""
Removes duplicates from a list while maintaining the order of the elements (stable).
:param l: The input list.
:return: List with removed duplicates.
"""
# TODO: Improve this implementation of making lists unique while maintaining the original order (stable).
duplicate = [False] * len(l)
for i1 in range(len(l)):
e = l[i1]
for i2 in range(i1+1,len(l)):
if e == l[i2]:
duplicate[i2] = True
unique = []
for i, d in zip(range(len(duplicate)), duplicate):
if not d:
unique.append(l[i])
return unique
class _MergeRef(namedtuple('_MergeRef', ['to_op_index', 'to_in_expr_index', 'to_in_arg_index', 'from_op_index',
'from_out_expr_index', 'from_out_arg_index'])):
"""
Merge reference refer to indices in the operation dag. The 'to_op_index' and 'from_op_index' encode a dependency
between an output tensor of the from-op that is used as input tensor in the to-op. The 'to_in_expr_index' refers to
the INPUT expression definition within the expression dag of the to-op. The 'to_in_arg_index' contains the input
argument index as defined in the signature of the to-op. The 'from_out_expr_index' refers to the OUTPUT expression
definition within the expression dag of the from-op. The 'from_out_arg_index' is the output argument index of the
tensor in the from-op.
:param to_op_index: The index of the to-op in the operator dag.
:param to_in_expr_index: The index of the INPUT expression defining the tensor in the expression dag of the to-op.
:param to_in_arg_index: The index of the input argument of this tensor in the to-op.
:param from_op_index: The index of the from-op in the operator dag.
:param from_out_expr_index: The index of the OUTPUT expression defining the tensor in the expression dag of the
from-op.
:param from_out_arg_index: The index of the output argument of the from-op.
:return: A merge reference.
"""
__slots__ = () # empty slots
def same(self, ref):
"""
Two merge reference are the same if all their indices match. This method does NOT test for object equality.
:param ref: The other merge reference.
:return: True if they are the same, otherwise false.
"""
return self.to_op_index == ref.to_op_index \
and self.to_in_expr_index == ref.to_in_expr_index \
and self.to_in_arg_index == ref.to_in_arg_index \
and self.from_op_index == ref.from_op_index \
and self.from_out_expr_index == ref.from_out_expr_index \
and self.from_out_arg_index == ref.from_out_arg_index
def _get_merge_refs_for_op_dag(proto_op_dag):
"""
Creates a list of operators with their arguments that can be merged into single operators. Notice that merge
information is given per argument.
:param proto_op_dag: the protobuf representation of the operator dag.
:return: merge information that contains a list of merge_refs for operators and their input/output arguments and it
contains a list of merge_names of operator names (ambiguous) together with argument indices as strings which is ONLY
useful for debugging.
"""
ops = proto_op_dag.operators # get operators, each operator contains an expression dag
outs = proto_op_dag.dag_outputs
refs = proto_op_dag.references # references of the operator dag
# Walk the dag from each output to inputs and compute information about merging of ops.
inputs = []
staged = set()
merge_refs = [] # Holds the indices of ops in tuples.
# For each output tensor of the operator dag.
for output in outs:
output_index = output.op_index # operator index of this output
inputs.append(output_index) # add output index
staged.clear() # remove all staged operator indices.
staged.add(output_index) # mark this operator index as staged.
while len(inputs) > 0:
to_op_index = inputs.pop(0)
to_exp_dag = ops[to_op_index]
to_ref = refs[to_op_index]
to_workgroup_shape = to_exp_dag.workgroup_shape
for to_in_arg_index, input in enumerate(to_ref.input_refs):
from_op_index = input.op_index
# Do not consider leaf nodes.
if input.is_leaf:
continue
# Append if not staged.
if from_op_index not in staged:
staged.add(from_op_index)
inputs.append(from_op_index)
expr = to_exp_dag.expressions[input.dag_input_index]
from_exp_dag = ops[from_op_index]
from_workgroup_shape = from_exp_dag.workgroup_shape
input_shape_of_out = expr.tensor_type.shape
output_indices = _get_output_indices(from_exp_dag)
output_index = output_indices[input.op_output_index]
output_shape_of_in = _get_output_shape(from_exp_dag, input.op_output_index)
match = to_workgroup_shape == from_workgroup_shape
if not match:
logger.info('Non-matching workgroup shapes for %s input [%d] and %s output [%d].'
% (to_exp_dag.name, to_in_arg_index, from_exp_dag.name, input.op_output_index))
continue
#assert input_shape_of_out == output_shape_of_in # Must always match in well defined dags.
# Because of broadcasting that we currently do not handle.
match = input_shape_of_out == output_shape_of_in
if not match:
logger.info('Non-matching tensor shapes (boradcasting) for %s input [%d] and %s output [%d].'
% (to_exp_dag.name, to_in_arg_index, from_exp_dag.name, input.op_output_index))
continue
# get the indexing pattern for the output to this input.
tensor_write_indices = _get_tensor_assign_indices_for_expr_index(from_exp_dag, output_index)
match = len(tensor_write_indices) == 1
if not match:
logger.info('Multiple writes to %s output [%d]' % (from_exp_dag.name, input.op_output_index))
continue
# if there are multiple different write patterns we cannot merge.
tensor_write_index = tensor_write_indices[0]
tensor_read_indices = _get_tensor_read_indices_for_expr_index(to_exp_dag, input.dag_input_index)
for read_index in tensor_read_indices:
match = _match_index_in_expr_dags(from_exp_dag, tensor_write_index, to_exp_dag, read_index)
if not match:
break
if not match:
logger.info('No-matching read/write index pattern for %s input [%d] and %s output [%d]'
% (to_exp_dag.name, to_in_arg_index, from_exp_dag.name, input.op_output_index))
continue
merge_refs.append(_MergeRef(to_op_index=to_op_index,
to_in_expr_index=input.dag_input_index,
to_in_arg_index=to_in_arg_index,
from_op_index=from_op_index,
from_out_expr_index=output_index,
from_out_arg_index=input.op_output_index))
# Eliminate duplicates in merge_refs and merge_names.
return _eliminate_duplicates(merge_refs)
def _group_merge_refs(proto_op_dag, merge_refs):
"""
Takes the protbuf representation of the operator dag together with the merge references and converts them into
grouped merge references, which is pairs of (to-op-index, from-op-index).
Two ops can be merged if:
- All inputs of the to-op come from one and the same from-op.
- OR inputs can come from external inputs.
:param proto_op_dag: the protobuf representation of the operator dag.
:param merge_refs: merge references.
:return: pairs of (to-op-index, from-op-index).
"""
# Create a set with triplets (to_op_index, from_op_index, from_out_arg_index) as elements.
merge_group_info = set()
for merge_ref in merge_refs:
merge_group_info.add((merge_ref.to_op_index, merge_ref.from_op_index, merge_ref.from_out_arg_index))
group_merge_refs = []
for i, reference in enumerate(proto_op_dag.references):
if len(reference.input_refs) == 0:
continue
from_op_index = reference.input_refs[0].op_index
# Input is in refs and has an reference as leaf node with op_index = 0.
if from_op_index == i:
continue
to_op_index = i
do_group = True
for in_ref in reference.input_refs:
# If ref is an external input, then continue.
if in_ref.is_leaf:
continue
# The input comes from another op, we cannot merge in this case.
if in_ref.op_index != from_op_index:
do_group = False
break
# This input tensor cannot be merged because it does not appear in the merge info.
from_out_arg_index = in_ref.op_output_index
if (to_op_index, from_op_index, from_out_arg_index) not in merge_group_info:
do_group = False
break
# If we can group append the tuple to the list.
if do_group:
group_merge_refs.append((to_op_index, from_op_index))
return group_merge_refs
_IndicesInfo = namedtuple('_IndicesInfo', ['used_only_in_to', 'used_only_in_to_io',
'used_in_general', 'used_in_general_io',
'external_inputs_for_to', 'external_inputs_for_to_io',
'outputs_removed_in_from',
'new_out_indices', 'input_to_ouput_index'])
def _get_indices_info(proto_op_dag, to_op_index, from_op_index):
"""
Returns index information about:
- ASSIGN_TENSOR indices within the from-operator that are used only in the to-operator.
- ASSIGN_TENSOR indices within the from-operator that are used in the to-operator and other operators.
- Their associated indices as they appear in the listing of input/output arguments for the to-operator and
from-operator.
- The INPUT indices for any external inputs of the to-operator.
- The updated output indices according to the VARIABLE definitions within the from-operator as they are used within
the merged operator for the to-operator reads.
- An input to output index mapping.
:param proto_op_dag: The protobuf description of an operator dag.
:param to_op_index: The to-operator index within the operator dag.
:param from_op_index: The from-operator index within the operator dag.
:return: Index information.
"""
references = proto_op_dag.references
ops = proto_op_dag.operators
outs = proto_op_dag.dag_outputs
used_indices_io = set()
external_inputs_for_to_io = set()
external_outputs = dict() # Key is the operator index and the value is the set of output indices.
for out in outs:
if out.op_index not in external_outputs:
external_outputs[out.op_index] = set()
external_outputs[out.op_index].add(out.op_output_index)
for i, ref in enumerate(references[to_op_index].input_refs):
if ref.op_index == from_op_index:
used_indices_io.add(ref.op_output_index)
if ref.is_leaf:
external_inputs_for_to_io.add(i)
# Output tensors of the from-op could be used by ops other than the to-op.
used_in_general_io = set()
for i, reference in enumerate(references):
if i == to_op_index or i == from_op_index:
continue
for ref in reference.input_refs:
if ref.op_index == from_op_index \
and ref.op_output_index in used_indices_io:
used_in_general_io.add(ref.op_output_index)
# Or output tensors of the from-op can be used as external output.
if from_op_index in external_outputs:
used_in_general_io |= external_outputs[from_op_index]
used_only_in_to_io = used_indices_io - used_in_general_io
out_indices = _get_output_indices(ops[from_op_index])
used_only_in_to = set()
used_in_general = set()
# Mapping of from-op output indices to their new output index.
new_out_indices = list()
new_out_index = 0
for i in range(0, len(out_indices)):
if i in used_only_in_to_io:
used_only_in_to.add(out_indices[i])
new_out_indices.append(0)
elif i in used_in_general_io:
used_in_general.add(out_indices[i])
new_out_indices.append(new_out_index)
new_out_index += 1
else:
new_out_indices.append(new_out_index)
new_out_index += 1
# Mapping from input to output indices.
input_to_output_index = dict()
for i, ref in enumerate(references[to_op_index].input_refs):
if ref.op_index == from_op_index:
input_to_output_index[i] = ref.op_output_index
external_inputs_for_to = set()
input_indices = _get_input_indices(ops[to_op_index])
for i, input_index in enumerate(input_indices):
if i in external_inputs_for_to_io:
external_inputs_for_to.add(input_index)
outputs_removed_in_from = set()
outputs_removed_in_from |= used_only_in_to_io
if from_op_index in external_outputs:
# Exclude all those indices that are used as external output.
outputs_removed_in_from -= external_outputs[from_op_index]
return _IndicesInfo(used_only_in_to=used_only_in_to,
used_only_in_to_io=used_only_in_to_io,
used_in_general=used_in_general,
used_in_general_io=used_in_general_io,
external_inputs_for_to=external_inputs_for_to,
external_inputs_for_to_io=external_inputs_for_to_io,
outputs_removed_in_from=outputs_removed_in_from,
new_out_indices=new_out_indices,
input_to_ouput_index=input_to_output_index)
def _add_variable_const_expr(merged_expr_dag, old_expr):
"""
Adds a VARIABLE expression together with a CONST_SCALAR expression.
:param merged_expr_dag: The merged expression dag.
:param old_expr: The old expression used to read-out the data type.
:return: None.
"""
old_dtype = old_expr.tensor_type.dtype
const_scalar = lang.Expression()
const_scalar.code = lang.CONST_SCALAR
const_scalar.dtype = old_dtype
var = lang.Expression()
var.code = lang.VARIABLE
var.dtype = old_dtype
# Set the initial value.
if old_dtype == lang.FLOAT64:
const_scalar.double_data.append(0)
elif old_dtype == lang.FLOAT32 or old_dtype == lang.FLOAT16:
const_scalar.float_data.append(0)
elif old_dtype == lang.UINT64:
const_scalar.uint64_data.append(0)
elif old_dtype == lang.INT64:
const_scalar.sint64_data.append(0)
elif old_dtype == lang.UINT32 or old_dtype == lang.UINT16 or old_dtype == lang.UINT8:
const_scalar.uint32_data.append(0)
elif old_dtype == lang.INT32 or old_dtype == lang.INT16 or old_dtype == lang.INT8:
const_scalar.sint32_data.append(0)
else:
raise TypeError('Tried to add variable for unknown type '+old_dtype)
head_expr = merged_expr_dag.expressions.add()
head_expr.CopyFrom(const_scalar)
merged_expr_dag.references.add()
head_expr = merged_expr_dag.expressions.add()
head_expr.CopyFrom(var)
# The VARIABLE expression refers to the CONST_SCALAR expression.
merged_expr_dag.references.add().operand_indices.extend([len(merged_expr_dag.references) - 2])
def _merge_expr_dags(to_expr_dag, from_expr_dag, indices_info):
"""
Merges two expression dags. Two expression dags are merged using the following:
A) For the 'from_exp_dag' we do:
- For tensors not used in the to-op leave them as is.
- For tensors ONLY used in the to-op we replace the OUTPUT expression by VARIABLE expression and the
ASSIGN_TENSOR expression by an ASSIGN_VARIABLE expression.
- For tensors used in the to-op AND in another op we add a VARIABLE expression and an ASSIGN_VARIABLE expression.
B) For the 'to_expr_dag' we do:
- We replace READ_TENSOR expressions that refer to reads from tensors in the to-op through a READ_VARIABLE
expression. Notice that there are several other READ_TENSOR expressions, e.g. reading the POSITION value or
reading from external inputs that can not be replaced.
:param to_expr_dag: The expression dag of the to-operator.
:param from_expr_dag: The expression dag of the from-operator.
:param indices_info: The index information used for the merge.
:return: The merged expression dag.
"""
used_only_in_to = indices_info.used_only_in_to
used_in_general = indices_info.used_in_general
input_to_output_index = indices_info.input_to_ouput_index
external_inputs_for_to = indices_info.external_inputs_for_to
merged_expr_dag = lang.ExpressionDAG()
assign_indices = _get_tensor_assign_indices(from_expr_dag)
del_indices = set()
only_assign_indices = _get_indices_connected_to_expr_indices(from_expr_dag, assign_indices, used_only_in_to)
general_assign_indices = _get_indices_connected_to_expr_indices(from_expr_dag, assign_indices, used_in_general)
for only_index in only_assign_indices:
# Purge the index of the ASSIGN_TENSOR expression but not the ASSIGN_TENSOR expression itself.
index_index = from_expr_dag.references[only_index].operand_indices[1]
del_indices |= _get_indices_of_sub_expr_wout_position(from_expr_dag, index_index)
# Offset per index of expressions of from-op's expression dag.
offsets = np.zeros(len(from_expr_dag.expressions), dtype=int)
offset = 0
output_to_expr_index = dict()
# Counters for the number of inputs and outputs of the merged op.
input_count = 0
output_count = 0
# ******************************************************************************************************************
# For the 'from' expression dag do:
# - For tensors not used in the 'to' op leave everything as is.
# - For tensors ONLY used in the 'to' op replace the OUTPUT by VARIABLE and ASSIGN_TENSOR by ASSIGN_VARIABLE.
# - For tensors used in the 'to' op AND in another op add VARIABLE and ASSIGN_VARIABLE.
# ******************************************************************************************************************
for i, expr in enumerate(from_expr_dag.expressions):
# Do not copy expressions contained in the del_indices set.
if i in del_indices:
offset -= 1
offsets[i] = offset
continue
# Replace the OUTPUT expression by a VARIABLE expression.
if i in used_only_in_to:
_add_variable_const_expr(merged_expr_dag, expr)
offset += 1
io = _get_output_io_index(from_expr_dag, i)
output_to_expr_index[io] = len(merged_expr_dag.expressions) - 1
offsets[i] = offset
continue
# Replace the ASSIGN_TENSOR expression by an ASSIGN_VARIABLE expression.
if i in only_assign_indices:
head_expr = merged_expr_dag.expressions.add()
var_assign = lang.Expression()
var_assign.code = lang.ASSIGN_VARIABLE
head_expr.CopyFrom(var_assign)
i0 = from_expr_dag.references[i].operand_indices[0]
i1 = from_expr_dag.references[i].operand_indices[2]
operand_indices = [int(offsets[i0]) + i0, int(offsets[i1]) + i1]
merged_expr_dag.references.add().operand_indices.extend(operand_indices)
offsets[i] = offset
continue
# Copy the original expression.
head_expr = merged_expr_dag.expressions.add()
head_expr.CopyFrom(from_expr_dag.expressions[i])
# Set the io_index in the sequence of the INPUT definitions. This assumes that input definitions appear
# according to the order in the signature.
if head_expr.code == lang.INPUT:
head_expr.io_index = input_count
input_count += 1
# Set the io_index in the sequence of the OUTPUT definitions. This assumes that output definitions appear
# in the order of the return statement.
if head_expr.code == lang.OUTPUT:
head_expr.io_index = output_count
output_count += 1
# Set updated references for the original expression.
operand_indices = []
for iRef in from_expr_dag.references[i].operand_indices:
operand_indices.append(int(offsets[iRef]) + iRef)
merged_expr_dag.references.add().operand_indices.extend(operand_indices)
# Add a VARIABLE expression to the OUTPUT expression.
if i in used_in_general:
# Set the offset HERE as opposed to the end to refer to the OUTPUT expression and not the VARIABLE
# expression!
offsets[i] = offset
_add_variable_const_expr(merged_expr_dag, expr)
offset += 2
io = _get_output_io_index(from_expr_dag, i)
output_to_expr_index[io] = len(merged_expr_dag.expressions) - 1
continue
# Add an ASSIGN_VARIABLE.
if i in general_assign_indices:
# Set the offset HERE as opposed to the end to refer to the ASSIGN_TENSOR expression and not the
# ASSIGN_VARIABLE expression!
offsets[i] = offset
head_expr = merged_expr_dag.expressions.add()
var_assign = lang.Expression()
var_assign.code = lang.ASSIGN_VARIABLE
head_expr.CopyFrom(var_assign)
i0 = from_expr_dag.references[i].operand_indices[0]
i1 = from_expr_dag.references[i].operand_indices[2]
# Find output argument index for output at expression index i0.
io = _get_output_io_index(from_expr_dag, i0)
operand_indices = [output_to_expr_index[io], int(offsets[i1]) + i1]
merged_expr_dag.references.add().operand_indices.extend(operand_indices)
offset += 1
continue
# In all other cases set the offset here.
offsets[i] = offset
# ******************************************************************************************************************
# For the 'to' expression do:
# - Replace READ_TENSOR through the corresponding VARIABLE_READ
# Read tensors that are excluded:
# - Reads from external input variables.
# - Reads that appear within ASSIGN_TENSOR statements.
# ******************************************************************************************************************
to_pos_index = _get_position_index(to_expr_dag)
input_indices = _get_input_indices(to_expr_dag)
expr_to_input_index = dict()
for input_index in input_indices:
expr_to_input_index[input_index] = to_expr_dag.expressions[input_index].io_index
input_indices = set(input_indices)
input_indices -= external_inputs_for_to
del_indices = set()
del_indices.add(to_pos_index)
del_indices |= input_indices
# Get all TENSOR_READ indices.
read_indices = set(_get_tensor_read_indices(to_expr_dag))
for read_index in read_indices:
tensor_input_index = to_expr_dag.references[read_index].operand_indices[0]
if tensor_input_index in input_indices:
index_index = to_expr_dag.references[read_index].operand_indices[1]
del_indices |= _get_indices_of_sub_expr_wout_position(to_expr_dag, index_index)
del_indices.add(read_index)
# Do not remove indices that are used in TENSOR_ASSIGN statements.
assign_indices = _get_tensor_assign_indices(to_expr_dag)
for assign_index in assign_indices:
index_index = to_expr_dag.references[assign_index].operand_indices[1]
assign_index_tree = _get_indices_of_sub_expr_wout_position(to_expr_dag, index_index)
del_indices -= assign_index_tree
read_indices -= assign_index_tree
# Do not replace READ_TENSOR from external inputs.
delete_from_read = set()
for read_index in read_indices:
tensor_input_index = to_expr_dag.references[read_index].operand_indices[0]
if tensor_input_index in external_inputs_for_to:
delete_from_read |= _get_indices_of_sub_expr_wout_position(to_expr_dag, read_index)
read_indices -= delete_from_read
merge_pos_index = _get_position_index(merged_expr_dag)
offset = len(merged_expr_dag.expressions)
offsets = np.empty(len(to_expr_dag.expressions), dtype=int)
offsets.fill(offset)
outputs_for_to = _get_output_indices(to_expr_dag)
for i, expr in enumerate(to_expr_dag.expressions):
if i in del_indices:
offset -= 1
else:
head_expr = merged_expr_dag.expressions.add()
head_expr.CopyFrom(to_expr_dag.expressions[i])
if i in external_inputs_for_to:
head_expr.io_index = input_count
input_count += 1
if i in outputs_for_to:
head_expr.io_index = output_count
output_count += 1
operand_indices = []
for iRef in to_expr_dag.references[i].operand_indices:
if iRef == to_pos_index:
operand_indices.append(merge_pos_index)
elif iRef in read_indices:
input_index = to_expr_dag.references[iRef].operand_indices[0] # reference to tensor
operand_indices.append(output_to_expr_index[input_to_output_index[expr_to_input_index[input_index]]])
else:
operand_indices.append(int(offsets[iRef]) + iRef)
merged_expr_dag.references.add().operand_indices.extend(operand_indices)
offsets[i] = offset
# Set the name and workgroup shape of the merged expression dag.
merged_expr_dag.workgroup_shape.extend(from_expr_dag.workgroup_shape)
merged_expr_dag.name = from_expr_dag.name + '_' + to_expr_dag.name
return merged_expr_dag
_merged_dag_registry = {}
def _merge_op_dag(proto_op_dag, grad_dags):
"""
Merges the operator dag by decreasing the depth of the operator dag and increasing the depth of the expression dag.
:param proto_op_dag: The protobuf format of the operator dag.
:param grad_dags: The gradient dags, in order of the operators in the op dag
:return: A merged operator dag in protobuf format.
"""
op_dag_hash = hashlib.sha224(proto_op_dag.SerializeToString() + version.encode('utf-8')).hexdigest()
try:
return _merged_dag_registry[op_dag_hash]
except KeyError:
# Initialize the merged op-dag and the merge refs and their grouping.
merged_op_dag = proto_op_dag
merged_grad_dags = grad_dags
group_merge_refs = _group_merge_refs(merged_op_dag, _get_merge_refs_for_op_dag(merged_op_dag))
# While we can merge ops in the dag.
while len(group_merge_refs) > 0:
ops = merged_op_dag.operators
refs = merged_op_dag.references
merge_ref = group_merge_refs.pop(0)
to_op_index = merge_ref[0]
from_op_index = merge_ref[1]
logger.info('Merging ' + ops[from_op_index].name + ' and ' + ops[to_op_index].name)
indices_info = _get_indices_info(merged_op_dag, to_op_index, from_op_index)
if grad_dags is not None and None not in grad_dags:
# call f the downstream op and g the upstream op
# the merged op is then g(f(..), ..)
def get_expr_dag_io_info(expr_dag):
input_types = []
input_indices = []
output_types = []
output_indices = []
cur_input = 0
cur_output = 0
for expr_n, expr in enumerate(expr_dag.expressions):
if expr.code == lang.INPUT:
if cur_input != expr.io_index:
raise ValueError('Invalid Expression dag, received out of order input expression.')
cur_input += 1
input_types.append(expr.tensor_type)
input_indices.append(expr_n)
elif expr.code == lang.OUTPUT:
if cur_output != expr.io_index:
raise ValueError('Invalid expression dag, received out of order output expression.')
cur_output += 1
output_types.append(expr.tensor_type)
output_indices.append(expr_n)
return input_types, input_indices, output_types, output_indices
f_n = merge_ref[1]
g_n = merge_ref[0]
f = ops[f_n]
g = ops[g_n]
f_in_types, _, f_out_types, _ = get_expr_dag_io_info(f)
g_in_types, _, g_out_types, _ = get_expr_dag_io_info(g)
num_f_ins = len(f_in_types)
num_f_outs = len(f_out_types)
num_g_ins = len(g_in_types)
num_g_outs = len(g_out_types)
# classify input and output connects as being one of:
# g_in unshared - input to g not also used as an input to f
# f_out unshared - f output which remains external, not used by g
#
# note: all g outputs are untouched by merging process and remain external
g_ins_unshared = indices_info.external_inputs_for_to_io
g_ins_shared = set(range(num_g_ins)) - g_ins_unshared
f_outs_removed = indices_info.outputs_removed_in_from
# Note: this seems like to should be used_only_in_to_io, but appears not to be in some cases
# f_outs_internalized = indices_info.used_only_in_to_io
# f_outs_pruned = f_outs_removed - f_outs_internalized
# f_outs_remaining = set(range(num_f_outs)) - f_outs_removed
# try this instead
f_outs_internalized = set()
for g_in in g_ins_shared:
f_out = indices_info.input_to_ouput_index[g_in]
if f_out in f_outs_removed:
f_outs_internalized.add(f_out)
f_outs_pruned = f_outs_removed - f_outs_internalized
f_outs_remaining = set(range(num_f_outs)) - f_outs_removed
# find out which shared connections are duplicated
# find mapping of f_out -> g_in for duplicated and internalized connections
f_outs_to_g_ins_duplicated = {}
f_outs_to_g_ins_internalized = {}
g_ins_to_f_outs = {}
f_outs_duplicated = set()
for g_in in g_ins_shared:
f_out = indices_info.input_to_ouput_index[g_in]
g_ins_to_f_outs[g_in] = f_out
if f_out in f_outs_internalized:
if f_out not in f_outs_to_g_ins_internalized.keys():
f_outs_to_g_ins_internalized[f_out] = set()
f_outs_to_g_ins_internalized[f_out].add(g_in)
else:
if f_out not in f_outs_to_g_ins_duplicated.keys():
f_outs_to_g_ins_duplicated[f_out] = set()
f_outs_to_g_ins_duplicated[f_out].add(g_in)
f_outs_duplicated.add(f_out)
f_outs_unshared = f_outs_remaining - f_outs_duplicated
# create an _Operator from the f expression dag
f_inputs = []
for tt in f_in_types:
external_placeholder = _GradientPlaceholder(tt.shape, tt.dtype)
f_inputs.append(external_placeholder)
f_output_types = []
for tt in f_out_types:
f_output_types.append(TensorType.from_proto(tt))
f_op = _Operator(f, f_output_types, f_inputs, None, True, f.name)
# create a new set of _Operators that corresponds to g_grad_dag and wire up connections
# create inputs for g_grad_dag
g_grad_dag = grad_dags[g_n]
g_grad_dag_inputs = []
for g_grad_dag_in_n, tt in enumerate(g_grad_dag.dag_input_types):
if g_grad_dag_in_n in g_ins_to_f_outs.keys():
g_grad_dag_inputs.append(f_op[g_ins_to_f_outs[g_grad_dag_in_n]])
else:
external_placeholder = _GradientPlaceholder(tt.shape, tt.dtype)
g_grad_dag_inputs.append(external_placeholder)
def reify_op_dag(op_dag, dag_inputs):
operators = []
for op_n, op in enumerate(op_dag.operators):
op_inputs = []
for ref in op_dag.references[op_n].input_refs:
if ref.is_leaf:
op_inputs.append(dag_inputs[ref.dag_input_index])
else:
op_inputs.append(operators[ref.op_index][ref.op_output_index])
_, _, op_out_types_proto, _ = get_expr_dag_io_info(op)
op_out_types = []
for tt in op_out_types_proto:
op_out_types.append(TensorType.from_proto(tt))
operators.append(_Operator(op, op_out_types, op_inputs, None, True, op.name))
outputs = []
for dag_output in op_dag.dag_outputs:
outputs.append(operators[dag_output.op_index][dag_output.op_output_index])
return outputs
g_grad_outputs = reify_op_dag(g_grad_dag, g_grad_dag_inputs)
g_grad_external_outputs = []
for g_grad_n in sorted(g_ins_unshared):
g_grad_external_outputs.append(g_grad_outputs[g_grad_n])
# create inputs for f_grad
f_grad_inputs = []
for f_in_n in range(num_f_ins):
f_grad_inputs.append(f_inputs[f_in_n])
from .expression import output_like, position_in, output
@operator()
def gradient_sum(*args):
assert len(args) > 1
output_shape = args[0].shape
output_type = args[0].dtype
for arg in args:
assert arg.shape == output_shape
assert arg.dtype == output_type
out = output_like(args[0])
pos = position_in(output_shape)
s = args[0][pos]
for arg in args[1:]:
s += arg[pos]
out[pos] = s
return out
@operator()
def zeros(shape=None, dtype=None):
out = output(shape, dtype)
pos = position_in(shape)
out[pos] = 0
return out
f_grad_in_externals = []
for f_out_n in range(num_f_outs):
if f_out_n in f_outs_unshared:
external_placeholder = _GradientPlaceholder(f_out_types[f_out_n].shape, f_out_types[f_out_n].dtype)
f_grad_inputs.append(external_placeholder)
f_grad_in_externals.append(external_placeholder)
elif f_out_n in f_outs_duplicated:
external_placeholder = _GradientPlaceholder(f_out_types[f_out_n].shape, f_out_types[f_out_n].dtype)
shared = [external_placeholder]
for g_in in f_outs_to_g_ins_duplicated[f_out_n]:
shared.append(g_grad_outputs[g_in])
f_grad_inputs.append(gradient_sum(*shared))
f_grad_in_externals.append(external_placeholder)
elif f_out_n in f_outs_internalized:
shared = []
for g_in in f_outs_to_g_ins_internalized[f_out_n]:
shared.append(g_grad_outputs[g_in])
if len(shared) == 1:
f_grad_inputs.append(shared[0])
else:
f_grad_inputs.append(gradient_sum(*shared))
elif f_out_n in f_outs_pruned:
# external_placeholder = _PrunedInput(f_out_types[f_out_n].shape, f_out_types[f_out_n].dtype)
f_grad_inputs.append(zeros(shape=f_out_types[f_out_n].shape, dtype=f_out_types[f_out_n].dtype))
# output has been pruned, so merged op gets no external gradient for this output
else:
raise ValueError('f outputs should be one of: unshared, duplicated, internalized, pruned')
f_grad_dag = grad_dags[f_n]
f_grad_outputs = reify_op_dag(f_grad_dag, f_grad_inputs)
# append g_grad_external_outputs and generate new op dag
f_grad_outputs.extend(g_grad_external_outputs)
new_dag = _build_op_dag(*f_grad_outputs)
ordered_final_inputs = []
ordered_final_inputs.extend(f_inputs)
for g_in in sorted(g_ins_unshared):
ordered_final_inputs.append(g_grad_dag_inputs[g_in])
ordered_final_inputs.extend(f_grad_in_externals)
ordered_final_inputs.extend(g_grad_dag_inputs[-num_g_outs:])
found_inputs = new_dag.inputs
input_remap = {}
for found_index, found_input in enumerate(found_inputs):
grad_index = ordered_final_inputs.index(found_input)
input_remap[found_index] = grad_index
# copy the found proto dag
reordered_dag = lang.OperatorDAG()
reordered_dag.CopyFrom(new_dag.proto_dag)
# clear the input types and put in the correct full signature of gradient inputs
del reordered_dag.dag_input_types[:]
for grad_input in ordered_final_inputs:
cur_type = grad_input.tensor_type.as_proto()
reordered_dag.dag_input_types.add().CopyFrom(cur_type)
# update the input references to update dag_input_index values
for ref in reordered_dag.references:
for input_ref in ref.input_refs:
if input_ref.is_leaf:
input_ref.dag_input_index = input_remap[input_ref.dag_input_index]
merged_grad_dags[g_n] = reordered_dag
del merged_grad_dags[f_n]
merged_expr_dag = _merge_expr_dags(ops[to_op_index], ops[from_op_index], indices_info)
new_out_indices = indices_info.new_out_indices
external_inputs_for_to_io = indices_info.external_inputs_for_to_io
outputs_removed_in_from = indices_info.outputs_removed_in_from
output_from_wout_to_num = len(_get_output_indices(ops[from_op_index])) - len(indices_info.used_only_in_to)
# Build up the newly merged operator dag.
new_merged_op_dag = lang.OperatorDAG()
# Update the operators.
for i, op in enumerate(ops):
# Replace the from operator by the merged operator.
if i == from_op_index:
new_merged_op_dag.operators.add().CopyFrom(merged_expr_dag)
# Copy any op that is neither the from-operator nor the to-operator.
elif i != to_op_index:
new_merged_op_dag.operators.add().CopyFrom(op)
# Update the input references to operators.
for i, ref in enumerate(refs):
# Exclude references for the to-operator.
if i == to_op_index:
continue
# Update all other references.
ref_list = lang.OperatorDAG.OperatorInputReferences()
for in_ref in ref.input_refs:
proto_ref = lang.OperatorDAG.OperatorInputReference()
proto_ref.is_leaf = in_ref.is_leaf
op_index = in_ref.op_index
# Input index is the same input from the op dag.
proto_ref.dag_input_index = in_ref.dag_input_index
if op_index == from_op_index:
proto_ref.op_output_index = new_out_indices[in_ref.op_output_index]
elif op_index == to_op_index:
proto_ref.op_output_index = output_from_wout_to_num + in_ref.op_output_index
else:
proto_ref.op_output_index = in_ref.op_output_index
if op_index > to_op_index:
op_index -= 1
elif op_index == to_op_index:
op_index = from_op_index
proto_ref.op_index = op_index
ref_list.input_refs.add().CopyFrom(proto_ref)
if i == from_op_index:
# Uses the to_op_index in refs to ge the input_refs!
for input_index, in_ref in enumerate(refs[to_op_index].input_refs):
# Add the input reference only if this input of the 'to-op' has not yet been mapped to an output of
# the 'from-op'.
if input_index in external_inputs_for_to_io:
proto_ref = lang.OperatorDAG.OperatorInputReference()
proto_ref.is_leaf = in_ref.is_leaf
op_index = in_ref.op_index
if op_index > to_op_index:
op_index -= 1
elif op_index == to_op_index:
op_index = from_op_index
proto_ref.op_index = op_index
proto_ref.op_output_index = in_ref.op_output_index
proto_ref.dag_input_index = in_ref.dag_input_index
ref_list.input_refs.add().CopyFrom(proto_ref)
new_merged_op_dag.references.add().CopyFrom(ref_list)
# Create the outputs of the merged op.
for i, dag_output in enumerate(merged_op_dag.dag_outputs):
proto_out = lang.OperatorDAG.DAGOutputReference()
op_index = dag_output.op_index
proto_out.op_output_index = dag_output.op_output_index
if op_index == to_op_index:
proto_out.op_output_index += output_from_wout_to_num
if op_index == from_op_index:
# Subtract as many from the op-output-index as has been removed that are smaller or equal to this index.
sub_value = 0
for out_remove_io in outputs_removed_in_from:
if out_remove_io <= proto_out.op_output_index:
sub_value += 1
proto_out.op_output_index -= sub_value
if op_index > to_op_index:
op_index -= 1
elif op_index == to_op_index:
op_index = from_op_index
proto_out.op_index = op_index
new_merged_op_dag.dag_outputs.add().CopyFrom(proto_out)
merged_op_dag = new_merged_op_dag
group_merge_refs = _group_merge_refs(merged_op_dag, _get_merge_refs_for_op_dag(merged_op_dag))
# Copy the input types only if the merged op dag differs from the initially provided op dag.
if merged_op_dag is not proto_op_dag:
for dag_input in proto_op_dag.dag_input_types:
proto_type = TensorType.like(dag_input).as_proto()
merged_op_dag.dag_input_types.add().CopyFrom(proto_type)
if merged_grad_dags is not None and None not in merged_grad_dags:
opt_grad_dags = []
for grad_dag in merged_grad_dags:
gd, _ = _merge_op_dag(grad_dag, None)
opt_grad_dags.append(gd)
else:
opt_grad_dags = merged_grad_dags
_merged_dag_registry[op_dag_hash] = (merged_op_dag, opt_grad_dags)
return merged_op_dag, opt_grad_dags
def _make_generic_c(src, name):
# look for generic c++ shared library in the operator cache
generic_cpp_so_path = os.path.join(cache_directory, name + '_generic_cpp.so')
if not os.path.exists(generic_cpp_so_path):
logger.debug('Compiling generic C++ for Op ' + name)
generic_cpp_path = os.path.join(cache_directory, name + '_generic_cpp.cpp')
with open(generic_cpp_path, 'w') as f:
f.write(src)
this_file_path = os.path.abspath(__file__)
this_directory = os.path.split(this_file_path)[0]
try:
subprocess.check_output([cxx, '-fPIC', '-std=c++11', '-g', '-pedantic',
'-Wall', '-Wextra',
'-I'+this_directory,
'-shared',
'-o', generic_cpp_so_path, generic_cpp_path],
stderr=subprocess.STDOUT,
universal_newlines=True)
except subprocess.CalledProcessError as exception:
logger.error('c++ compiler error: ' + exception.output)
raise
return generic_cpp_so_path
def _make_generic_cuda(src, name):
# look for generic cuda shared library in the operator cache
generic_cuda_so_path = os.path.join(cache_directory, name + '_generic_cuda.so')
if not os.path.exists(generic_cuda_so_path):
logger.debug('Compiling generic CUDA for Op ' + name)
# generate and compile generic cuda operator
nvcc_path = os.path.join(cuda_directory, 'bin/nvcc')
generic_cuda_path = os.path.join(cache_directory, name + '_generic_cuda.cu')
generic_cuda_o_path = os.path.join(cache_directory, name + '_generic_cuda.o')
with open(generic_cuda_path, 'w') as f:
f.write(src)
this_file_path = os.path.abspath(__file__)
this_directory = os.path.split(this_file_path)[0]
try:
subprocess.check_output([nvcc_path, '-O3', '--use_fast_math', '--relocatable-device-code=true', '--compile',
'-Xcompiler', '-fPIC', '-std=c++11', '-I'+this_directory,
generic_cuda_path, '-o', generic_cuda_o_path],
stderr=subprocess.STDOUT,
universal_newlines=True)
subprocess.check_output([nvcc_path, '-shared', '-o', generic_cuda_so_path, generic_cuda_o_path],
stderr=subprocess.STDOUT,
universal_newlines=True)
except subprocess.CalledProcessError as exception:
logger.error('nvcc error: ' + exception.output)
raise
# clean up .o files
subprocess.call(['rm', generic_cuda_o_path])
return generic_cuda_so_path
[docs]def evaluate(output_list, target_language='cpp', opt_level=3):
"""
Evaluate a collection of OVL operator, mainly used for testing. This function uses a test operator
function for running the generated generic version of the operator so it does not depend on an external
execution runtime. This also means that this function only works for operators whose inputs are numpy arrays.
:param output_list: The outputs to evaluate
:param target_language: 'cpp' or 'cuda'
:param opt_level: Optimization level.
:return: A list of numpy arrays for each operator output in output_list
"""
evaluated_outputs = profile(output_list, target_language=target_language,
profiling_iterations=1, opt_level=opt_level)[0]
if len(evaluated_outputs) == 1:
return evaluated_outputs[0]
else:
return evaluated_outputs
[docs]def profile(output_list, target_language, profiling_iterations, opt_level):
"""
Evaluate a collection of OVL operator, mainly used for testing. This function uses a test operator
function for running the generated generic version of the operator so it does not depend on an external
execution runtime. This also means that this function only works for operators whose inputs are numpy arrays.
:param output_list: The outputs to evaluate
:param target_language: 'cpp' or 'cuda'
:param profiling_iterations: Number of times to run this operator for profiling purposes.
Must be a positive int.
:param opt_level: optimization level
:return: A tuple containing a list of numpy arrays for each operator output in output_list, and a dictionary of
numpy arrays containing the execution times for each operator in the operator DAG.
"""
# Generate the protobuf header file.
# Since all we need for the test libraries is the DType enum, do not use protoc to generate the
# fully functional protobuf code, since this introduces a dependency on the C++ protobuf development libraries.
proto_header = os.path.join(cache_directory, 'language_dtype.h')
if not os.path.exists(proto_header):
enum_src = ''
for enum_name, enum_val in lang.DType.items():
enum_src += ' ' + enum_name + ' = ' + str(enum_val) + ',\n'
# generate header enum
h_src = """
|//Generated Code - do not edit
|#ifndef LANGUAGE_DTYPE_H
|#define LANGUAGE_DTYPE_H
|namespace opveclib {
|enum DType {
|${enum_src}
|};
|}
|#endif // LANGUAGE_DTYPE_H
"""
h_src = string.Template(h_src).substitute(locals())
h_src = re.sub('\n[ \t]*\|', '\n', h_src)
with open(proto_header, 'w') as f:
f.write(h_src)
# Dynamically load the test library, compile if necessary
invalid_language = 'Unsupported target_language: ' + target_language
if target_language == 'cpp':
testlib_path = os.path.join(cache_directory, 'libtestcop.so.'+version)
try:
libtest = ctypes.cdll.LoadLibrary(testlib_path)
except OSError:
import tensorflow as tf
this_file_path = os.path.abspath(__file__)
this_directory = os.path.split(this_file_path)[0]
tf_include = tf.sysconfig.get_include()
# build the test framework library
cc_path = os.path.join(this_directory, 'testcop.cc')
try:
subprocess.check_output([cxx, '-fPIC', '-Wall', '-shared',
'-std=c++11', '-Ofast', '-Wextra',
'-I'+this_directory,
'-I'+cache_directory,
'-isystem', tf_include,
'-o', testlib_path, cc_path],
stderr=subprocess.STDOUT,
universal_newlines=True)
except subprocess.CalledProcessError as exception:
logger.error('c++ compiler error: ' + exception.output)
raise
libtest = ctypes.cdll.LoadLibrary(testlib_path)
test_c_op = libtest.testCOperator
test_c_op.restype = ctypes.c_int16
test_c_op.argtypes = \
[ctypes.c_char_p, ctypes.c_char_p,
ndpointer(dtype=_TensorParam, flags="C_CONTIGUOUS"), ctypes.c_size_t,
ndpointer(dtype=_TensorParam, flags="C_CONTIGUOUS"), ctypes.c_size_t,
ndpointer(dtype=ctypes.c_double, flags="C_CONTIGUOUS"), ctypes.c_size_t]
test_cuda_op = None
elif target_language == 'cuda':
testlib_path = os.path.join(cache_directory, 'libtestcudaop.so.'+version)
try:
libtest = ctypes.cdll.LoadLibrary(testlib_path)
except OSError:
this_file_path = os.path.abspath(__file__)
this_directory = os.path.split(this_file_path)[0]
# build the test framework library
cc_path = os.path.join(this_directory, 'testcudaop.cc')
o_path = os.path.join(cache_directory, 'testcudaop.o')
nvcc_path = os.path.join(cuda_directory, 'bin/nvcc')
try:
subprocess.check_output([nvcc_path, '-O3', '--relocatable-device-code=true',
'-x', 'cu', '--compile', '-Xcompiler',
'-fPIC', '-std=c++11',
'-I'+this_directory,
'-I'+cache_directory,
cc_path, '-o', o_path],
stderr=subprocess.STDOUT,
universal_newlines=True)
# relocatable device code has to be defined when linking in addition
# to compiling. The default compiler, g++, has no concept of this, so
# we have to do an extra device code link step with a dummy link file
linko_path = os.path.join(cache_directory, 'link.o')
subprocess.check_output([nvcc_path, '-dlink', '-Xcompiler', '-fPIC',
'-o', linko_path, o_path],
stderr=subprocess.STDOUT,
universal_newlines=True)
subprocess.check_output([cxx, '-shared',
'-o', testlib_path, o_path, linko_path,
'-lcuda'],
stderr=subprocess.STDOUT,
universal_newlines=True)
except subprocess.CalledProcessError as exception:
logger.error('nvcc error: ' + exception.output)
raise
# clean up .o files
subprocess.call(['rm', o_path, linko_path])
libtest = ctypes.cdll.LoadLibrary(testlib_path)
test_cuda_op = libtest.testCUDAOperator
test_cuda_op.restype = ctypes.c_int16
test_cuda_op.argtypes = \
[ctypes.c_char_p, ctypes.c_char_p,
ndpointer(dtype=_TensorParam, flags="C_CONTIGUOUS"), ctypes.c_size_t,
ndpointer(dtype=_TensorParam, flags="C_CONTIGUOUS"), ctypes.c_size_t,
ctypes.c_uint16,
ndpointer(dtype=ctypes.c_double, flags="C_CONTIGUOUS"), ctypes.c_size_t]
test_c_op = None
else:
raise ValueError(invalid_language)
op_dag = _build_op_dag(*output_list)
dag = op_dag.proto_dag
if opt_level >= 3:
dag, _ = _merge_op_dag(dag, None)
inputs = op_dag.inputs
output_buffers = []
profiling_times = {}
# compile all ops in the dag
for op_index, op in enumerate(dag.operators):
name = _op_hash(op)
# generate code
op_c_generic, op_cuda_generic = \
ExpressionDAG.generate(op, name)
input_types, output_types = ExpressionDAG.io_types()
num_inputs = len(input_types)
num_outputs = len(output_types)
eval_times_ms = np.empty(profiling_iterations, dtype=np.float64)
eval_times_ms[:] = np.nan
cur_input_params = np.empty(num_inputs, dtype=_TensorParam)
for input_index, input_ref in enumerate(dag.references[op_index].input_refs):
if input_ref.is_leaf:
cur_buffer = inputs[input_ref.dag_input_index]
else:
cur_buffer = output_buffers[input_ref.op_index][input_ref.op_output_index]
cur_data = cur_buffer.ctypes.data
cur_dtype = ctypes.c_int(input_types[input_index].dtype.proto_dtype)
cur_len = ctypes.c_size_t(input_types[input_index].size)
cur_input_params[input_index] = _TensorParam(data=cur_data, dtype=cur_dtype, len=cur_len)
# allocate output memory for the current operator
cur_output_params = np.empty(num_outputs, dtype=_TensorParam)
output_buffers.append([])
cur_buffers = output_buffers[-1]
for output_index, output_type in enumerate(output_types):
t = output_type.dtype.as_numpy()
new_buffer = np.empty(output_type.shape, dtype=t)
cur_buffers.append(new_buffer)
cur_data = new_buffer.ctypes.data
cur_dtype = ctypes.c_int(output_type.dtype.proto_dtype)
cur_len = ctypes.c_size_t(output_type.size)
cur_output_params[output_index] = _TensorParam(data=cur_data, dtype=cur_dtype, len=cur_len)
# evaluate the outputs
if target_language == 'cpp':
lib_path = _make_generic_c(op_c_generic, name)
lib_path = ctypes.c_char_p(lib_path.encode('ascii'))
f_name = ctypes.c_char_p((name+'_generic_cpp').encode('ascii'))
err = test_c_op(lib_path, f_name,
cur_input_params, ctypes.c_size_t(num_inputs),
cur_output_params, ctypes.c_size_t(num_outputs),
eval_times_ms,
ctypes.c_size_t(profiling_iterations))
elif target_language == 'cuda':
lib_path = _make_generic_cuda(op_cuda_generic, name)
lib_path = ctypes.c_char_p(lib_path.encode('ascii'))
f_name = ctypes.c_char_p((name+'_generic_cuda').encode('ascii'))
err = test_cuda_op(lib_path, f_name,
cur_input_params, ctypes.c_size_t(num_inputs),
cur_output_params, ctypes.c_size_t(num_outputs),
ctypes.c_uint16(_default_cuda_threads_per_block),
eval_times_ms,
ctypes.c_size_t(profiling_iterations))
else:
raise ValueError(invalid_language)
profiling_times[name] = eval_times_ms
# TODO: deallocate output buffers that are no longer needed
outputs = []
for out_ref in dag.dag_outputs:
outputs.append(output_buffers[out_ref.op_index][out_ref.op_output_index])
return outputs, profiling_times
[docs]def as_tensorflow(tensor_list, opt_level=3):
"""
Create a DAG of TensorFlow operators based on a DAG of OVL operators and register it with the current
TensorFlow Graph. The inputs to the DAG must be numpy arrays or TensorFlow tensors.
:param tensor_list: operator outputs to convert to TensorFlow tensors
:param opt_level: optimization level to use
:return: A TensorFlow operator.
"""
op_dag = _build_op_dag(*tensor_list)
dag = op_dag.proto_dag
grad_dags = op_dag.grad_dags
if opt_level >= 3:
dag, grad_dags = _merge_op_dag(dag, grad_dags)
return _dag_to_tf(dag, op_dag.inputs, grad_dags)
def _dag_to_tf(dag, inputs, grad_dags):
output_tensors = []
# compile all ops in the dag
for op_index, op in enumerate(dag.operators):
name = _op_hash(op)
# generate code
op_c_generic, op_cuda_generic = \
ExpressionDAG.generate(op, name)
cpu_op_lib = _make_generic_c(op_c_generic, name)
if cuda_enabled:
cuda_op_lib = _make_generic_cuda(op_cuda_generic, name)
else:
cuda_op_lib = ''
input_types, output_types = ExpressionDAG.io_types()
out_shapes = []
out_tf_types = []
for cur_type in output_types:
out_tf_types.append(cur_type.dtype.as_tensorflow())
out_shapes.append(cur_type.shape)
cur_inputs = []
for ref in dag.references[op_index].input_refs:
if ref.is_leaf:
cur_inputs.append(inputs[ref.dag_input_index])
else:
cur_inputs.append(output_tensors[ref.op_index][ref.op_output_index])
if grad_dags is None or grad_dags[op_index] is None:
serialized_grad_dag = ''
else:
serialized_grad_dag = grad_dags[op_index].SerializeToString()
tf_op = _DynamicLibOp.module().dynamic_lib(name='opveclib_' + op.name,
inputs=cur_inputs,
out_shapes=out_shapes,
out_types=out_tf_types,
cpu_lib_path=cpu_op_lib,
cpu_func_name=name + '_generic_cpp',
gpu_lib_path=cuda_op_lib,
gpu_func_name=name + '_generic_cuda',
serialized_grad_dag=serialized_grad_dag,
cuda_threads_per_block=_default_cuda_threads_per_block)
output_tensors.append(tf_op)
outputs = []
for out_ref in dag.dag_outputs:
outputs.append(output_tensors[out_ref.op_index][out_ref.op_output_index])
if len(outputs) == 1:
return outputs[0]
else:
return outputs