Purple Air / Kano Pixel

I had three of these little Kano pixel units which I bought for the kids to learn some programming on when they were on sale. Turns out the kids didn’t really use them much, and now Kano seems to be history.

The boards are based on the ESP-32 WROOM microcontroller, and can be programmed using the Arduino IDE, but they can also run MicroPython – a version of the Python programming language that runs on microcontrollers like this one. Furthermore, there is already a library which adds support for the LED matrix (128 NeoPixels), the buttons and the rotary dial on the top. Sadly, it does not have support for the USB connectors, or the built-in microphone (which I suspect is connected via the 4th USB channel that the hub IC on the board provides).

Once I had it running Python, my next thought was what could I do with it…

Converting to MicroPython

Converting it to run MicroPython was relatively straightforward following the instructions on the Instructables website. The one change I made was upgrading to the latest version of MicroPython after the conversion using the command line esptool to program the latest version. That also necessitated a small change to the PixelKit library code. I have a fork of the original library in my GitHub account that has these changes (and may get additional ones as I play more with these boards).

An Application

I have had a PurpleAir sensor running at our house for several years now, and one of the things I have always wanted to do with it was have a simple way of getting the current AQI value from it displayed. While you can get the data from the PurpleAir map, which I have as a web app on my phone for relatively quick access, having an always on display appealed.

PurpleAir do now have an API, but after you have used the initial credits they give you for free, you need to pay for it. Since I was only interested in my own sensor, and only from within the house, I started looking at whether I could build a simple web-scraping app to access the data from the device’s embedded web UI. Turns out, you don’t need to do any scraping at all: the device provides a JSON endpoint that returns all the data in a simple JSON structure.

The plan was to have two displays:

  1. A chart showing the last 16 values (just under three minutes of data since it updates every 10 seconds). 16 because that is the number of columns on the Kano Pixel’s LED matrix.
  2. A second mode showing the current AQI as a number rendered on the matrix.

The dial on the top would be used to switch between the modes. When set to “1” it would show the graph, and when turned to “2”, the current AQI number.

Collecting the Data

To get the JSON data, a simple HTTP GET to http://<IP>/json?live=true is all that is needed. That returns a flat JSON object, inside which we find, among many other things, these values:

  • pm2.5_aqi: The current PM 2.5 AQI value from the primary sensor (the PurpleAir units have two sensors to allow for fault detection)
  • pm2.5_aqi_b: The PM 2.5 AQI value from the secondary sensor
  • p25aqic: An RGB color tuple that follows the PurpleAir color scheme for AQI values
  • current_temp_f: The current temperature inside the sensor, in Fahrenheit
  • current_humidity: Current humidity (as a percentage)
  • current_dewpoint_f: Current dew point temperature, in Fahrenheit
  • pressure: Current air pressure

There are a lot more values that I might use in future, or in other projects, but for now this set is more than I needed for the first version.

MicroPython supports the asyncio package, which allows me to schedule tasks concurrently on the small board relatively easily, so I created a Python class to wrap the bulk of the code, and a simple function I could add to the end of the MicroPython main.py file. I added a check for one of the buttons too so I could boot the board and not run the app if I needed to:

try:
        import aqi
        if kit.button_b.value() == 1:
                aqi.run_aqi()
        else:
                print("Not starting AQI - button pressed\n")
except:
        print("AQI Not found\n")

The run_aqi() function simply does this:

def run_aqi():

  kit.clear()
  kit.render()

  aqi = AQI()
  asyncio.create_task(aqi.check_dial())
  asyncio.run(aqi.run())

That clears the matrix display, creates the AQI instance, starts a task to monitor the position of the dial control (more on that later) and then runs the main thread which collects the data and updates the display every ten seconds.

The run() method itself is also pretty simple, just collecting the data and then calling another method to render the current display. We collect and maintain all the data so, for example, switching from the number to the graph has the last 16 values ready to be displayed.

  def run(self):
    while True:
      d = self.getData()
      if d is not None:
        self.update_aqi(d)
        self.update_aqi_colors(d)
        self.update_others(d)

	      # Render the updated information (depending on mode)
	      self.render()
      else:
        print("Error getting result\n")
        
      # wait for 10 seconds  
      await asyncio.sleep(10)

At the end, we use the asyncio.sleep() function to wait 10 seconds before collecting the next data point.

When it comes to storing the data, for the AQI and the color associated with it, we use Python lists of 16 elements. Each time we read a new value for either, we add it to the end of the list (which was initialized with all zeros for the AQI and (0, 0, 0) tuples for the color) and remove one from the beginning of the list. The 16 elements then represent the values needed for each bar in the bar chart. For the other values, we are currently recording only the latest value (this may change in future if we add more modes to the display).

  def update_aqi(self,d):
    aqi_a = d["pm2.5_aqi"]
    aqi_b = d["pm2.5_aqi_b"]
    # Push a new value to the end of the list
    aqi = (aqi_a + aqi_b)/2
    self.results.append(aqi)
    self.results.pop(0)

  def update_aqi_colors(self,d):
    # Use channel a for color
    col = d["p25aqic"]
    col = tuple(map(lambda x: int(int(x)/10), col[4:-1].split(",")))
    self.colors.append(col)
    self.colors.pop(0)

  def update_others(self, d):
    try:
      self.temp = d["current_temp_f"]
      self.humidity = d["current_humidity"]
      self.dew = d["current_dewpoint_f"]
      self.pressure = d["pressure"]
    except Exception as e: 
      print("Exception getting other metrics: %s\n" % str(e))

The Dial

The dial, while being marked with five positions (“1”, “2”, “3”, a camera icon and a “star” icon) is actually a potentiometer connected to one of the ESP32’s ADC pins and returns values between 0 and 4095 depending on its position. To simplify things a little, I am dividing the returned value by 256 ( >> 8 ), giving me values between 0 and 15. Still more than I needed, but I discovered that the positions are not quite evenly distributed with the two ends being a little closer to their neighbors on the dial than the middle three are. With 16 positions, I was able to create this mapping between the returned value and the “mode” shown on the dial:

  DIAL_MAP = [ "3", "3",
               "2", "2", "2", "2",
               "1", "1", "1", "1",
               "C", "C", "C", "C",
               "S", "S"]

With this in place, it was a simple task to create a method that could run regularly to check for changes in the dial position, and another to return the current mode (which could be called at startup for example):

  def on_dial(self, dial):
    print("Dial changed to {}\n".format(dial))
    self.mode = dial
    self.render()

  def get_dial(self):
    dv = kit.dial.read() >> 8
    if dv >= 0 and dv <= 15:
      return AQI.DIAL_MAP[dv]
    return "?"

  def check_dial(self):
    old_dial = self.get_dial()
    while True:
      dial = self.get_dial()
      if dial != old_dial:
        self.on_dial(dial)
        old_dial = dial
      await asyncio.sleep(0.25)

I added a third in there, on_dial(), following a model from the PixelKit, but in hindsight I think that could easily be removed.

The Graph

The graph is a simple plot of the data in the list, but we auto-scale it so that it looks good on the matrix no matter the range of values.

  def vline(self, x, y, h, color):
    for i in range(y, y+h):
      kit.set_pixel(x, i, color)

  def render_aqi_graph(self):
    mx = max(self.results)
    mx = math.ceil(mx / 8.0) * 8
    scale = int(mx/8)+1

    kit.clear()

    for i in range(len(self.results)):
      aqi = self.results[i]
      c = self.colors[i]
      l = round(aqi/scale)
      self.vline(i, 8-l, l, c)

    kit.render()

The vline() method draws a vertical line from the specified coordinate of the specified length, in the specified color (which is expected to be an RGB tuple). The matrix has its origin in the top left corner, hence the adjustment to the starting coordinate in the call to vline().

The way the PixelKit library works is that updates to the matrix are made in a buffer, and then the call to kit.render() pushes them onto the display quickly. This results in a very smooth transition as the graph changes, and no noticeable flicker.

The AQI Value

The AQI value makes use of a MicroPython library called BitmapFont, which was created by Adafruit, but is now deprecated as they have moved all their development effort to CircuitPython (the Python version I used for the Halloween skull project). It includes a 5 x 8 pixel font, which is perfect for displaying two digit numbers on the Kano (each character includes a 1 pixel space, so they are actually 6 x 8, meaning the 16 pixel wide display cannot quite do three digits). The library was modified to use the PixelKit functions to set pixel values. Thinking of adding this to the PixelKit code, but haven’t done that yet.

Showing the number, including centering it when it is a single digit, looks like this:

  def render_aqi_value(self):
    kit.clear()
    aqi_str = "%d" % self.results[-1]
    color = self.colors[-1]
    with BitmapFont(16, 8) as bf:
      pixlen = bf.width(aqi_str)
      x = (18 - pixlen) >> 1
      bf.text(aqi_str, x, 0, color)
    kit.render()

The AQI value ends up looking like this:

What’s Next?

The plan for the next part is to get the 12 year old to help me write something that can show the other collected values, and maybe graph things like temperature over a longer timeframe (16 hours or maybe even 16 days).

I also want to see whether I can find the pins for the USB hub and get that working, and I’d also like to see whether there is anything that monitors the battery level (the Kano Pixel has a Li-Ion battery). I don’t recall whether the original firmware mentioned a way to get the battery level, so it may not have a monitor connected to it.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.