#### Part 2: Practical Aspects with Python

Photo by Elton Luz on Unsplash

In this article we will follow the tenets of TDD for developing Scientific Software as laid out in the first installment of this series to develop an edge detection filter known as the Sobel filter.

In the first article, we talked about how important — and tricky — it can be to develop reliable testing methods for software which the complex problems often found in scientific software. We also saw how to overcome those issues by following a development cycle inspired by TDD, but adapted for scientific computing. I reproduce a shortened version of these instructions below.

#### Implementation cycle

Gather requirementsSketch the designImplement initial testsImplement your alpha versionBuild an oracle libraryRevisit testsImplement profiling

#### Optimization cycle

OptimizeReimplement

#### New method cycle

Implement new methodValidate against previous curated oracles

### Getting Started: The Sobel Filter

In this article, we will follow the above instructions to develop a function which applies the Sobel filter. The Sobel filter is a commonly used computer vision tool to detect or enhance edges in images. Keep reading to see some examples!

Figure 1. Kernels for the Sobel–Feldman operator. Credit: own work.

Starting with the first step, we gather some requirements. We will follow the standard formulation of the Sobel filter described in this article. Simply put, the Sobel operator consists of convolving image A with the following two 3 × 3 kernels, squaring each pixel in the resulting images, summing them and taking the point-by-point square root. If Ax and Ay are the images resulting from the convolutions, the result of the Sobel filter S is √(Ax² + Ay²).

### Requirements

We want this function to take any 2D array and generate another 2D array. We might want it to operate on any two axes of an ndarray. We might even want it to work on more (or less) than two axes. We might have specifications for how to handle the edges of the array.

For now let’s remember to keep it simple, and start with a 2D implementation. But before we do that, let’s sketch the design.

### Sketch the Design

We start with a simple design, leveraging Python’s annotations. I highly recommend annotating as much as possible, and using mypy as a linter.

from typing import Tuple

from numpy.core.multiarray import normalize_axis_index

from numpy.typing import NDArray

def sobel(arr: NDArray, axes: Tuple[int, int] = (-2, -1)) -> NDArray:

“””Compute the Sobel filter of an image

Parameters

———-

arr : NDArray

Input image

axes : Tuple[int, int], optional

Axes over which to compute the filter, by default (-2, -1)

Returns

——-

NDArray

Output

“””

# Only accepts 2D arrays

if arr.ndim != 2:

raise NotImplementedError

# Ensure that the axis[0] is the first axis, and axis[1] is the second

# axis. The obscure `normalize_axis_index` converts negative indices to

# indices between 0 and arr.ndim – 1.

if any(

normalize_axis_index(ax, arr.ndim) != i

for i, ax in zip(range(2), axes)

):

raise NotImplementedError

pass

### Implement Tests

This function doesn’t do much. But it is documented, annotated, and its current limitations are baked into it. Now that we have a design, we immediately pivot to tests.

First, we notice that empty images (all zeroes) have no edges. So they must output zeroes as well. In fact, any constant image should also return zeros. Let’s bake that into out first tests. We will also see how we can use monkey testing to test many arrays.

# test_zero_constant.py

import numpy as np

import pytest

# Test multiple dtypes at once

@pytest.mark.parametrize(

“dtype”,

[“float16”, “float32”, “float64”, “float128”],

)

def test_zero(dtype):

# Set random seed

seed = int(np.random.rand() * (2**32 – 1))

np.random.seed(seed)

# Create a 2D array of random shape and fill with zeros

nx, ny = np.random.randint(3, 100, size=(2,))

arr = np.zeros((nx, ny), dtype=dtype)

# Apply sobel function

arr_sob = sobel(arr)

# `assert_array_equal` should fail most of the times.

# It will only work when `arr_sob` is identically zero,

# which is usually not the case.

# DO NOT USE!

# np.testing.assert_array_equal(

# arr_sob, 0.0, err_msg=f”{seed=} {nx=}, {ny=}”

# )

# `assert_almost_equal` can fail when used with high decimals.

# It also relies on float64 checking, which might fail for

# float128 types.

# DO NOT USE!

# np.testing.assert_almost_equal(

# arr_sob,

# np.zeros_like(arr),

# err_msg=f”{seed=} {nx=}, {ny=}”,

# decimal=4,

# )

# `assert_allclose` with custom tolerance is my preferred method

# The 10 is arbitrary and depends on the problem. If a method

# which you know to be correct does not pass, increase to 100, etc.

# If the tolerance needed to make the tests pass is too high, make

# sure the method is actually correct.

tol = 10 * np.finfo(arr.dtype).eps

err_msg = f”{seed=} {nx=}, {ny=} {tol=}” # Log seeds and other info

np.testing.assert_allclose(

arr_sob,

np.zeros_like(arr),

err_msg=err_msg,

atol=tol, # rtol is useless for desired=zeros

)

@pytest.mark.parametrize(

“dtype”, [“float16”, “float32”, “float64”, “float128”]

)

def test_constant(dtype):

seed = int(np.random.rand() * (2**32 – 1))

np.random.seed(seed)

nx, ny = np.random.randint(3, 100, size=(2,))

constant = np.random.randn(1).item()

arr = np.full((nx, ny), fill_value=constant, dtype=dtype)

arr_sob = sobel(arr)

tol = 10 * np.finfo(arr.dtype).eps

err_msg = f”{seed=} {nx=}, {ny=} {tol=}”

np.testing.assert_allclose(

arr_sob,

np.zeros_like(arr),

err_msg=err_msg,

atol=tol, # rtol is useless for desired=zeros

)

This code snippet can be run from the command-line with

$ pytest -qq -s -x -vv –durations=0 test_zero_constant.py

### Alpha Version

Of course our tests will currently fail, but they are enough for now. Let’s implement a first version.

from typing import Tuple

import numpy as np

from numpy.core.multiarray import normalize_axis_index

from numpy.typing import NDArray

def sobel(arr: NDArray, axes: Tuple[int, int] = (-2, -1)) -> NDArray:

if arr.ndim != 2:

raise NotImplementedError

if any(

normalize_axis_index(ax, arr.ndim) != i

for i, ax in zip(range(2), axes)

):

raise NotImplementedError

# Define our filter kernels. Notice they inherit the input type, so

# that a float32 input never has to be cast to float64 for computation.

# But can you see where using another dtype for Gx and Gy might make

# sense for some input dtypes?

Gx = np.array(

[[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]],

dtype=arr.dtype,

)

Gy = np.array(

[[-1, -2, -1], [0, 0, 0], [1, 2, 1]],

dtype=arr.dtype,

)

# Create the output array and fill with zeroes

s = np.zeros_like(arr)

for ix in range(1, arr.shape[0] – 1):

for iy in range(1, arr.shape[1] – 1):

# Pointwise multiplication followed by sum, aka convolution

s1 = (Gx * arr[ix – 1 : ix + 2, iy – 1 : iy + 2]).sum()

s2 = (Gy * arr[ix – 1 : ix + 2, iy – 1 : iy + 2]).sum()

s[ix, iy] = np.hypot(s1, s2) # np.sqrt(s1**2 + s2**2)

return s

With this new function, all our tests should pass, and we should get an output like this:

$ pytest -qq -s -x -vv –durations=0 test_zero_constant.py

……..

======================================== slowest durations =========================================

0.09s call t_049988eae7f94139a7067f142bf2852f.py::test_constant[float16]

0.08s call t_049988eae7f94139a7067f142bf2852f.py::test_zero[float64]

0.06s call t_049988eae7f94139a7067f142bf2852f.py::test_constant[float128]

0.04s call t_049988eae7f94139a7067f142bf2852f.py::test_zero[float128]

0.04s call t_049988eae7f94139a7067f142bf2852f.py::test_constant[float64]

0.02s call t_049988eae7f94139a7067f142bf2852f.py::test_constant[float32]

0.01s call t_049988eae7f94139a7067f142bf2852f.py::test_zero[float16]

(17 durations < 0.005s hidden. Use -vv to show these durations.)

8 passed in 0.35s

We have so far:

Gathered requirements of our problem.Sketched an initial design.Implemented some tests.Implemented the alpha version, which passes these tests.

These tests provide *verification* for future versions, as well as a very rudimentary *oracle library*. But while unit tests are great at capturing minute deviations from expected results, they are not great at differentiating wrong results from quantitatively different — but still correct — results. Suppose tomorrow we want to implement another Sobel-type operator, which has a longer kernel. We don’t expect the results to match exactly to our current version, but we do expect the outputs of both functions to be qualitatively very similar.

In addition, it is excellent practice to try many different inputs to our functions to get a sense of how they behave for different inputs. This ensures that we scientifically validate the results.

Scikit-image has an excellent library of images, which we can use to create oracles.

# !pip installscikit-image pooch

from typing import Dict, Callable

import numpy as np

import skimage.data

bwimages: Dict[str, np.ndarray] = {}

for attrname in skimage.data.__all__:

attr = getattr(skimage.data, attrname)

# Data are obtained via function calls

if isinstance(attr, Callable):

try:

# Download the data

data = attr()

# Ensure it is a 2D array

if isinstance(data, np.ndarray) and data.ndim == 2:

# Convert from various int types to float32 to better

# assess precision

bwimages[attrname] = data.astype(np.float32)

except:

continue

# Apply sobel to images

bwimages_sobel = {k: sobel(v) for k, v in bwimages.items()}

Once we have the data, we can plot it.

import matplotlib.pyplot as plt

from mpl_toolkits.axes_grid1 import make_axes_locatable

def create_colorbar(im, ax):

divider = make_axes_locatable(ax)

cax = divider.append_axes(“right”, size=”5%”, pad=0.1)

cb = ax.get_figure().colorbar(im, cax=cax, orientation=”vertical”)

return cax, cb

for name, data in bwimages.items():

fig, axs = plt.subplots(

1, 2, figsize=(10, 4), sharex=True, sharey=True

)

im = axs[0].imshow(data, aspect=”equal”, cmap=”gray”)

create_colorbar(im, axs[0])

axs[0].set(title=name)

im = axs[1].imshow(bwimages_sobel[name], aspect=”equal”, cmap=”gray”)

create_colorbar(im, axs[1])

axs[1].set(title=f”{name} sobel”)Figure 2. “Binary blobs” dataset before (left) and after (right) Sobel filtering. Credit: own work.Figure 3. “Text” dataset before (left) and after (right) Sobel filtering. Credit: own work.

These look really good! I would recommend storing these, both as arrays and as figures which I can quickly check against for a new version. I highly recommend HD5F for array storage. You can also set up a Sphinx Gallery to directly generate the figures them when updating documentation, that way you have a reproducible figure build that you can use to check against previous versions.

After the results have been validated, you can store them on disk and use them as part of your unit testing. Something like this:

oracle_library = [(k, v, bwimages_sobel[k]) for k, v in bwimages.items()]

# save_to_disk(oracle_library, …)# test_oracle.py

import numpy as np

import pytest

from numpy.typing import NDArray

# oracle_library = read_from_disk(…)

@pytest.mark.parametrize(“name,input,output”, oracle_library)

def test_oracles(name: str, input: NDArray, output: NDArray):

output_new = sobel(input)

tol = 10 * np.finfo(input.dtype).eps

mean_avg_error = np.abs(output_new – output).mean()

np.testing.assert_allclose(

output_new,

output,

err_msg=f”{name=} {tol=} {mean_avg_error=}”,

atol=tol,

rtol=tol,

)

### Profiling

Computing the Sobel filter for these datasets took a while! So the next step is to profile the code. I recommend creating a benchmark_xyz.py file for each test, and re-run them regularly to probe for performance regressions. This can even be part of your unit testing, but we won’t go so far in this example. Another idea is to use the oracles above for benchmarking.

There are many ways of timing code execution. To get the system-wide, “real” elapsed time, use perf_counter_ns from the built-in time module to measure time in nanoseconds. In a Jupyter notebook, the built-in %%timeit cell magic times a certain cell execution. The decorator below is inspired by this cell magic and can be used to time any function.

import time

from functools import wraps

from typing import Callable, Optional

def sizeof_fmt(num, suffix=”s”):

for unit in [“n”, “μ”, “m”]:

if abs(num) < 1000:

return f”{num:3.1f} {unit}{suffix}”

num /= 1000

return f”{num:.1f}{suffix}”

def timeit(

func_or_number: Optional[Callable] = None,

number: int = 10,

) -> Callable:

“””Apply to a function to time its executions.

Parameters

———-

func_or_number : Optional[Callable], optional

Function to be decorated or `number` argument (see below), by

default None

number : int, optional

Number of times the function will run to obtain statistics, by

default 10

Returns

——-

Callable

When fed a function, returns the decorated function. Otherwise

returns a decorator.

Examples

——–

.. code-block:: python

@timeit

def my_fun():

pass

@timeit(100)

def my_fun():

pass

@timeit(number=3)

def my_fun():

pass

“””

if isinstance(func_or_number, Callable):

func = func_or_number

number = number

elif isinstance(func_or_number, int):

func = None

number = func_or_number

else:

func = None

number = number

def decorator(f):

@wraps(f)

def wrapper(*args, **kwargs):

runs_ns = np.empty((number,))

# Run first and measure store the result

start_time = time.perf_counter_ns()

result = f(*args, **kwargs)

runs_ns[0] = time.perf_counter_ns() – start_time

for i in range(1, number):

start_time = time.perf_counter_ns()

f(*args, **kwargs) # Without storage, faster

runs_ns[i] = time.perf_counter_ns() – start_time

time_msg = f”{sizeof_fmt(runs_ns.mean())} ± “

time_msg += f”{sizeof_fmt(runs_ns.std())}”

print(

f”{time_msg} per run (mean ± std. dev. of {number} runs)”

)

return result

return wrapper

if func is not None:

return decorator(func)

return decorator

Putting our function to the test:

arr_test = np.random.randn(500, 500)

sobel_timed = timeit(sobel)

sobel_timed(arr_test);

# 3.9s ± 848.9 ms per run (mean ± std. dev. of 10 runs)

Not too fast 🙁

When investigating slowness or performance regressions, it is also possible to track the runtime of each line. The line_profiler library is an excellent resource for this. It can be used via Jupyter cell magic, or using the API. Here is an API example:

from line_profiler import LineProfiler

lp = LineProfiler()

lp_wrapper = lp(sobel)

lp_wrapper(arr_test)

lp.print_stats(output_unit=1) # 1 for seconds, 1e-3 for milliseconds, etc.

Here is an example output:

Timer unit: 1 s

Total time: 4.27197 s

File: /tmp/ipykernel_521529/1313985340.py

Function: sobel at line 8

Line # Hits Time Per Hit % Time Line Contents

==============================================================

8 def sobel(arr: NDArray, axes: Tuple[int, int] = (-2, -1)) -> NDArray:

9 # Only accepts 2D arrays

10 1 0.0 0.0 0.0 if arr.ndim != 2:

11 raise NotImplementedError

12

13 # Ensure that the axis[0] is the first axis, and axis[1] is the second

14 # axis. The obscure `normalize_axis_index` converts negative indices to

15 # indices between 0 and arr.ndim – 1.

16 1 0.0 0.0 0.0 if any(

17 normalize_axis_index(ax, arr.ndim) != i

18 1 0.0 0.0 0.0 for i, ax in zip(range(2), axes)

19 ):

20 raise NotImplementedError

21

22 1 0.0 0.0 0.0 Gx = np.array(

23 1 0.0 0.0 0.0 [[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]],

24 1 0.0 0.0 0.0 dtype=arr.dtype,

25 )

26 1 0.0 0.0 0.0 Gy = np.array(

27 1 0.0 0.0 0.0 [[-1, -2, -1], [0, 0, 0], [1, 2, 1]],

28 1 0.0 0.0 0.0 dtype=arr.dtype,

29 )

30 1 0.0 0.0 0.0 s = np.zeros_like(arr)

31 498 0.0 0.0 0.0 for ix in range(1, arr.shape[0] – 1):

32 248004 0.1 0.0 2.2 for iy in range(1, arr.shape[1] – 1):

33 248004 1.8 0.0 41.5 s1 = (Gx * arr[ix – 1 : ix + 2, iy – 1 : iy + 2]).sum()

34 248004 1.7 0.0 39.5 s2 = (Gy * arr[ix – 1 : ix + 2, iy – 1 : iy + 2]).sum()

35 248004 0.7 0.0 16.8 s[ix, iy] = np.hypot(s1, s2)

36 1 0.0 0.0 0.0 return s

Lot’s of time is spend inside the innermost loop. NumPy prefers vectorized math, as it can then rely on compiled code. Since we are using explicit for loops, it makes sense that this function is very slow.

In addition, it is important to be smart about memory allocations inside of loops. NumPy is somewhat smart about allocating small amounts of memory inside loops, so the lines defining s1 or s2 might not be allocating new memory. But they also could be, or there could be some other memory allocation that is happening under the hood that we are not aware of. I therefore recommend also profiling memory. I like using Memray for that, but there are others like Fil and Sciagraph. I would also avoid memory_profiler which (very unfortunately!) is no longer maintained. Also Memray is much more powerful. Here is an example of Memray in action:

$ # Create sobel.bin which holds the profiling information

$ memray run -fo sobel.bin –trace-python-allocators sobel_run.py

Writing profile results into sobel.bin

Memray WARNING: Correcting symbol for aligned_alloc from 0x7fc5c984d8f0 to 0x7fc5ca4a5ce0

[memray] Successfully generated profile results.

You can now generate reports from the stored allocation records.

Some example commands to generate reports:

python3 -m memray flamegraph sobel.bin$ # Generate flame graph

$ memray flamegraph -fo sobel_flamegraph.html –temporary-allocations sobel.bin

⠙ Calculating high watermark… ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸ 99% 0:00:0100:01

⠏ Processing allocation records… ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸ 98% 0:00:0100:01

Wrote sobel_flamegraph.html$ # Show memory tree

$ memray tree –temporary-allocations sobel.bin

⠧ Calculating high watermark… ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸ 100% 0:00:0100:01

⠧ Processing allocation records… ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸ 100% 0:00:0100:01

Allocation metadata

——————-

Command line arguments:

‘memray run -fo sobel.bin –trace-python-allocators sobel_run.py’

Peak memory size: 11.719MB

Number of allocations: 15332714

Biggest 10 allocations:

———————–

📂 123.755MB (100.00 %) <ROOT>

└── [[3 frames hidden in 2 file(s)]]

└── 📂 123.755MB (100.00 %) _run_code /usr/lib/python3.10/runpy.py:86

├── 📂 122.988MB (99.38 %) <module> sobel_run.py:40

│ ├── 📄 51.087MB (41.28 %) sobel sobel_run.py:35

│ ├── [[1 frames hidden in 1 file(s)]]

│ │ └── 📄 18.922MB (15.29 %) _sum

│ │ lib/python3.10/site-packages/numpy/core/_methods.py:49

│ └── [[1 frames hidden in 1 file(s)]]

│ └── 📄 18.921MB (15.29 %) _sum

│ lib/python3.10/site-packages/numpy/core/_methods.py:49

…Figure 4. Memray flamegraph for the alpha version. Credit: own work.

### Beta Version and a Bug

Now that we have a working alpha and some profiling functions, we will leverage the SciPy library to obtain a much faster version.

from typing import Tuple

import numpy as np

from numpy.core.multiarray import normalize_axis_index

from numpy.typing import NDArray

from scipy.signal import convolve2d

def sobel_conv2d(

arr: NDArray, axes: Tuple[int, int] = (-2, -1)

) -> NDArray:

if arr.ndim != 2:

raise NotImplementedError

if any(

normalize_axis_index(ax, arr.ndim) != i

for i, ax in zip(range(2), axes)

):

raise NotImplementedError

# Create the kernels as a single, complex array. Allows us to use

# np.abs instead of np.hypot to calculate the magnitude.

G = np.array(

[[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]],

dtype=arr.dtype,

)

G = G + 1j * np.array(

[[-1, -2, -1], [0, 0, 0], [1, 2, 1]],

dtype=arr.dtype,

)

s = convolve2d(arr, G, mode=”same”)

np.absolute(s, out=s) # In-place abs

return s.realsobel_timed = timeit(sobel_conv2d)

sobel_timed(arr_test)

# 14.3 ms ± 1.71 ms per loop (mean ± std. dev. of 10 runs)

Much better! But is it right?

Figure 5. “Microaneurysms” dataset before (left) and after (center and right) Sobel filtering using both versions. Credit: own work.

The images look very similar, but if you notice the color scale, they are not the same. Running the tests flags a small mean average error. Thankfully, we are now well-equipped at detecting quantitative and qualitative differences.

After investigating this bug, we attribute it to the different boundary conditions. Looking into convolve2d documentation tells us that the input array is padded with zeroes before applying the kernel. In the alpha, we padded the *output*!

Which one is right? Arguably the SciPy implementation makes more sense. In this case we should adopt the new version as the oracle, and if required, fix the alpha version to match it. This is common in scientific software development: new information of how to do things better changes the oracles and the tests.

In this case, the fix is easy, pad the array with zeros prior to processing it.

def sobel_v2(arr: NDArray, axes: Tuple[int, int] = (-2, -1)) -> NDArray:

# …

arr = np.pad(arr, (1,)) # After padding, it is shaped (nx + 2, ny + 2)

s = np.zeros_like(arr)

for ix in range(1, arr.shape[0] – 1):

for iy in range(1, arr.shape[1] – 1):

s1 = (Gx * arr[ix – 1 : ix + 2, iy – 1 : iy + 2]).sum()

s2 = (Gy * arr[ix – 1 : ix + 2, iy – 1 : iy + 2]).sum()

s[ix – 1, iy – 1] = np.hypot(s1, s2) # Adjust indices

return s

Once we corrected out function, we can update the oracles and tests which rely on them.

### Final Thoughts

We saw how to put in practice some of the software development ideas explored in this article. We also saw some tools which you can use to develop high-quality, high-performance code.

I suggest you try some of these ideas on your own projects. In particular, practice profiling code and improving it. The Sobel filter function we coded is very inefficient, I suggest trying to improve it.

For example, try CPU parallelization with a JIT compiler such as Numba, porting the inner loop into Cython, or implementing a CUDA GPU function with Numba or CuPy. Feel free to check out my series on coding CUDA kernels with Numba.

Developing Scientific Software was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story.