TVM 在 CPU 上的优化
概览
- tvm 在 CPU 上的优化需要借助 llvm 作为基础,在 llvm 的优化基础之前,tvm 针对自身计算图给出了两点重要的优化:
- 对内存访问增加 cache 命中率:这需要将原来的内存访问模式转化为适合 cache 的内存访问模式。
- SIMD(Single instruction multi-data), 向量化处理单元:每次处理的是一组数据而不是一个数据。这要求我们修改数据访问模式,使得数据访问以组的形式访问,这样 LLVM 后端就可以基于这个访问将其向量化。
实例分析
通过矩阵乘法来了解 TVM 在 CPU上的优化。我们通过定义一个1024x1024 的 float32 矩阵来评估 TVM 在 CPU 上的优化。
基准时间测试 (baseline)
首先是 TVM 在原生算法(为经过任何优化可读性强的矩阵乘算法) 上的优化:
import tvm import numpy # The size of the square matrix N = 1024 # The default tensor type in tvm dtype = "float32" # Random generated tensor for testing a = tvm.nd.array(numpy.random.rand(N, N).astype(dtype), tvm.cpu(0)) b = tvm.nd.array(numpy.random.rand(N, N).astype(dtype), tvm.cpu(0)) # Raw Algorithm k = tvm.reduce_axis((0, N), 'k') A = tvm.placeholder((N, N), name = 'A') B = tvm.placeholder((N, N), name = 'B') C = tvm.compute( A.shape, lambda x, y: tvm.sum(A[x, k] * B[k, y], axis = k), name = 'C') # Default schedule s = tvm.create_schedule(C.op) func = tvm.build(s, [A, B, C], name = 'mmult') c = tvm.nd.array(numpy.zeros((N, N), dtype = dtype), tvm.cpu(0)) func(a, b, c) evaluator = func.time_evaluator(func.entry_name, tvm.cpu(0), number=1) print('Baseline: %f' % evaluator(a, b, c).mean)
在这一段代码中,首先定义 a, b 两个 1024*1024 随机矩阵,然后通过
# Raw Algorithm
注释下的代码来完成计算图的计算流程, 其中C = compute(...)
就是典型的,未经过任何优化的矩阵点乘计算。之后通过这个原始的计算算法获得这个矩阵乘计算所需要的时间(单位 s):Baseline: 2.111667
在这段代码中,可以通过
print(tvm.lower(s, [A, B, C], simple_mode=True))
来获得 TVM 的低层次 IR对于矩阵乘法的组织形式:produce C { for (x, 0, 1024) { for (y, 0, 1024) { C[((x*1024) + y)] = 0.000000f for (k, 0, 1024) { C[((x*1024) + y)] = (C[((x*1024) + y)] + (A[((x*1024) + k)]*B[(y + (k*1024))])) } } } }
第一个优化方案: Blocking
提高 cache 命中率的一个重要技巧为矩阵分块,循环平铺: 数据将被分块计算。在分块后,对于一个块内的内存访问大部分都是近邻的,所以可以提高 cache 命中率。
比如,将分块大小设置为32*32,则由 float32 可知,一个块的大小将为 32*32*32 bit = 4KB,对于 Intel i7 的 CPU L1 Cache 为 32KB, 因此整个块在计算的时候可以全部放入在 cache 中,从而减少 cache miss 的概率。
由此,可以把上述 1024*1024 的矩阵分割多个为 32*32 的矩阵,从而通过循环平铺的技术对矩阵乘法的 cache 命中率进行优化; 以下代码中的 tile 函数就是将矩阵乘法进行循环平铺计算,并对优化后的结果进行了评估
bn = 32 s = tvm.create_schedule(C.op) # Blocking by loop tiling xo, yo, xi, yi = s[C].tile(C.op.axis[0], C.op.axis[1], bn, bn) # Hoist reduction domain outside the blocking loop s[C].reorder(xo, yo, k, xi, yi) func = tvm.build(s, [A, B, C], name = 'mmult') assert func c = tvm.nd.array(numpy.zeros((N, N), dtype = dtype), tvm.cpu(0)) func(a, b, c) numpy.testing.assert_allclose(c.asnumpy(), answer, rtol=1e-5) # By simply tiling the loop 32x32, and hoisting k outside the blocking loops, we can see big # speedup compared with the baseline. evaluator = func.time_evaluator(func.entry_name, tvm.cpu(0), number=5) print('Opt1: %f' % evaluator(a, b, c).mean)
此时获得的矩阵乘法时间输出为
Opt1: 0.897558
此时优化后的代码为
produce C { for (x.outer, 0, 32) { for (y.outer, 0, 32) { for (x.inner.init, 0, 32) { for (y.inner.init, 0, 32) { C[(((((x.outer*1024) + y.outer) + (x.inner.init*32))*32) + y.inner.init)] = 0.000000f } } for (k, 0, 1024) { for (x.inner, 0, 32) { for (y.inner, 0, 32) { C[(((((x.outer*1024) + y.outer) + (x.inner*32))*32) + y.inner)] = (C[(((((x.outer*1024) + y.outer) + (x.inner*32))*32) + y.inner)] + (A[(((x.outer*32768) + k) + (x.inner*1024))]*B[(((y.outer + (k*32))*32) + y.inner)])) } } } } } }
Reference: tiling 乘法
第二步优化方案: Array Packing
在对矩阵乘法的循环平铺后,我们发现在计算32*32的矩阵相乘时,由于矩阵乘法的第二元仍然处于非连续性访问,所以在此还可以对矩阵第二元的访问改变得更加连续。
改变方法就是将第二元矩阵分割为条状,如16*16的矩阵可以分割为 [16/4][16][4], 这样在乘法时可以看做是单一16*4列向量进行乘法,从而更加连续。
下面的测试不采用平铺的方法只采用 Array Packing 的方法,具体的代码如下:
# We have to re-write the algorithm slightly. packedB = tvm.compute((N / bn, N, bn), lambda x, y, z: B[y, x * bn + z], name = 'packedB') C = tvm.compute(A.shape, lambda x, y: tvm.sum(A[x, k] * packedB[y / bn, k, y % bn], axis = k), name = 'C') s = tvm.create_schedule(C.op) xo, yo, xi, yi = s[C].tile(C.op.axis[0], C.op.axis[1], bn, bn) s[C].reorder(xo, yo, k, xi, yi) func = tvm.build(s, [A, B, C], name = 'mmult') assert func c = tvm.nd.array(numpy.zeros((N, N), dtype = dtype), tvm.cpu(0)) func(a, b, c) numpy.testing.assert_allclose(c.asnumpy(), answer, rtol=1e-5) evaluator = func.time_evaluator(func.entry_name, tvm.cpu(0), number=5) print('Opt2: %f' % evaluator(a, b, c).mean)
获得的测试结果为:
Opt2: 1.096825
产生的 tvm 优化后 IR 结果为:
// attr [packedB] storage_scope = "global" allocate packedB[float32 * 32 * 1024 * 32] produce packedB { for (x, 0, 32) { for (y, 0, 1024) { for (z, 0, 32) { packedB[((((x*1024) + y)*32) + z)] = B[(((x + (y*32))*32) + z)] } } } } produce C { for (x.outer, 0, 32) { for (y.outer, 0, 32) { for (x.inner.init, 0, 32) { for (y.inner.init, 0, 32) { C[(((((x.outer*1024) + y.outer) + (x.inner.init*32))*32) + y.inner.init)] = 0.000000f } } for (k, 0, 1024) { for (x.inner, 0, 32) { for (y.inner, 0, 32) { C[(((((x.outer*1024) + y.outer) + (x.inner*32))*32) + y.inner)] = (C[(((((x.outer*1024) + y.outer) + (x.inner*32))*32) + y.inner)] + (A[(((x.outer*32768) + k) + (x.inner*1024))]*packedB[((((y.outer*1024) + k)*32) + y.inner)])) } } } } } }
优化方法三: Vectorization
当检测到访问内存的连续性时,便可以通过向量化来进一步提升速度。向量化是一种 SIMD (单指令多数据) 技术,可以通过硬件进行向量计算,在循环平铺后,我们获得了连续的空间访问,所以可以通过向量化进行,如下代码:
s = tvm.create_schedule(C.op) xo, yo, xi, yi = s[C].tile(C.op.axis[0], C.op.axis[1], bn, bn) s[C].reorder(xo, yo, k, xi, yi) # Vectorization s[C].vectorize(yi) func = tvm.build(s, [A, B, C], name = 'mmult') assert func c = tvm.nd.array(numpy.zeros((N, N), dtype = dtype), tvm.cpu(0)) func(a, b, c) numpy.testing.assert_allclose(c.asnumpy(), answer, rtol=1e-5) evaluator = func.time_evaluator(func.entry_name, tvm.cpu(0), number=5) print('Opt3: %f' % evaluator(a, b, c).mean)
获得的时间结果为:
Opt3: 0.282522
优化方法4:并行化
在向量化的基础上,还可以通过并行化大幅提高速度;本次测试采用的节点有16个物理核,32个逻辑核参与并行;
并行的代码为
s = tvm.create_schedule(C.op) xo, yo, xi, yi = s[C].tile(C.op.axis[0], C.op.axis[1], bn, bn) s[C].reorder(xo, yo, xi, k, yi) # vectorize s[C].vectorize(yi) # parallel s[C].parallel(xo) func = tvm.build(s, [A, B, C], name = 'mmult') assert func c = tvm.nd.array(numpy.zeros((N, N), dtype = dtype), tvm.cpu(0)) func(a, b, c) numpy.testing.assert_allclose(c.asnumpy(), answer, rtol=1e-5) evaluator = func.time_evaluator(func.entry_name, tvm.cpu(0), number=50) opt5_time = evaluator(a, b, c).mean print('Opt4: %f' % opt5_time)
获得的运行时间结果为:
Opt4: 0.015810
此时优化生成的 tvm IR 代码为
// attr [packedB] storage_scope = "global" allocate packedB[float32 * 32 * 1024 * 32] produce packedB { for (x, 0, 32) { for (y, 0, 1024) { for (z, 0, 32) { packedB[((((x*1024) + y)*32) + z)] = B[(((x + (y*32))*32) + z)] } } } } produce C { for (x.outer, 0, 32) { for (y.outer, 0, 32) { for (x.inner.init, 0, 32) { C[ramp(((((x.outer*1024) + y.outer) + (x.inner.init*32))*32), 1, 32)] = x32(0.000000f) } for (k, 0, 1024) { for (x.inner, 0, 32) { C[ramp(((((x.outer*1024) + y.outer) + (x.inner*32))*32), 1, 32)] = (C[ramp(((((x.outer*1024) + y.outer) + (x.inner*32))*32), 1, 32)] + (x32(A[(((x.outer*32768) + k) + (x.inner*1024))])*packedB[ramp((((y.outer*1024) + k)*32), 1, 32)])) } } } } }
总结
总体而言,在 CPU 的优化上 tvm 的采取的主要策略是针对 cache 的命中率优化和向量化、并行化的优化方法,这个优化最终结果能快于 numpy (使用 BLAS) 自身的矩阵乘法(大约快30%); 但从实际应用情况来看,目前许多的矩阵运算库也能达到这样的结果。我们更期待的是 TVM 能够针对计算图这一特定计算模式给出的一些新颖的优化策略。