Table of Contents

KY-040 - Rotary Encoder

360 Degrees Rotary Encoder Module with 15×16.5 mm Potentiometer Rotary Knob Cap

Specification

Code

Library

ky040.py
# Author: Ignas Bukys
# Copyright Ignas Bukys 2022 Released under the MIT license
# Based on Kevin Köck and Peter Hinch's works for rotary encoder and async switch.
# Created on 2022-09-10 
 
__updated__ = "2022-09-10"
__version__ = "0.1"
 
import uasyncio as asyncio
import time
import micropython
from machine import Pin
 
_DIR_CW = const(0x10)  # Clockwise step
_DIR_CCW = const(0x20)  # Counter-clockwise step
 
# Rotary Encoder States
_R_START = const(0x0)
_R_CW_1 = const(0x1)
_R_CW_2 = const(0x2)
_R_CW_3 = const(0x3)
_R_CCW_1 = const(0x4)
_R_CCW_2 = const(0x5)
_R_CCW_3 = const(0x6)
_R_ILLEGAL = const(0x7)
 
_R_transition_table = [
 
    # |------------- NEXT STATE -------------|            |CURRENT STATE|
    # CLK/DT    CLK/DT     CLK/DT    CLK/DT
    #   00        01         10        11
    [_R_START, _R_CCW_1, _R_CW_1,  _R_START],             # _R_START
    [_R_CW_2,  _R_START, _R_CW_1,  _R_START],             # _R_CW_1
    [_R_CW_2,  _R_CW_3,  _R_CW_1,  _R_START],             # _R_CW_2
    [_R_CW_2,  _R_CW_3,  _R_START, _R_START | _DIR_CW],   # _R_CW_3
    [_R_CCW_2, _R_CCW_1, _R_START, _R_START],             # _R_CCW_1
    [_R_CCW_2, _R_CCW_1, _R_CCW_3, _R_START],             # _R_CCW_2
    [_R_CCW_2, _R_START, _R_CCW_3, _R_START | _DIR_CCW],  # _R_CCW_3
    [_R_START, _R_START, _R_START, _R_START]]             # _R_ILLEGAL
 
_R_transition_table_half_step = [
    [_R_CW_3,            _R_CW_2,  _R_CW_1,  _R_START],
    [_R_CW_3 | _DIR_CCW, _R_START, _R_CW_1,  _R_START],
    [_R_CW_3 | _DIR_CW,  _R_CW_2,  _R_START, _R_START],
    [_R_CW_3,            _R_CCW_2, _R_CCW_1, _R_START],
    [_R_CW_3,            _R_CW_2,  _R_CCW_1, _R_START | _DIR_CW],
    [_R_CW_3,            _R_CCW_2, _R_CW_3,  _R_START | _DIR_CCW],
    [_R_START,           _R_START, _R_START, _R_START],
    [_R_START,           _R_START, _R_START, _R_START]]
 
_STATE_MASK = const(0x07)
_DIR_MASK = const(0x30)
 
type_gen = type((lambda: (yield))())  # Generator type
 
 
# If a callback is passed, run it and return.
# If a coro is passed initiate it and return.
# coros are passed by name i.e. not using function call syntax.
def launch(func, tup_args):
    res = func(*tup_args)
    if isinstance(res, type_gen):
        loop = asyncio.get_event_loop()
        loop.create_task(res)
 
 
class ky040:
    debounce_ms = 50
    long_press_ms = 1000
    double_click_ms = 400
 
    def __init__(self, btn_num, clk_num, dt_num, rotary_reverse = False, suppress=True):
        self.pin = Pin(btn_num, Pin.IN, Pin.PULL_UP)
        self._supp = suppress  # don't call release func after long press
        self._if = None  # increment function
        self._ef = None  # decrement function
        self._tf = None  # pressed function
        self._ff = None  # released function
        self._df = None  # double pressed function
        self._lf = None  # long pressed function
        self._value = 0
        self._state = _R_START
        self._listener = []
        self._reverse = -1 if rotary_reverse else 1
 
        self._pin_clk = Pin(clk_num, Pin.IN)
        self._pin_dt = Pin(dt_num, Pin.IN)
 
        self._hal_enable_irq()
 
        self.sense = self.pin.value()  # Convert from electrical to logical value
        self.btn_state = self.rawstate()  # Initial state
        loop = asyncio.get_event_loop()
        loop.create_task(self.buttoncheck())  # Thread runs forever
 
    def add_listener(self, l):
        self._listener.append(l)
 
    def remove_listener(self, l):
        if l not in self._listener:
            raise ValueError('{} is not an installed listener'.format(l))
        self._listener.remove(l)
 
    def increment_func(self, func, args=()):
        self._if = func
        self._ia = args
 
    def decrement_func(self, func, args=()):
        self._ef = func
        self._ea = args
 
    def press_func(self, func, args=()):
        self._tf = func
        self._ta = args
 
    def release_func(self, func, args=()):
        self._ff = func
        self._fa = args
 
    def double_func(self, func, args=()):
        self._df = func
        self._da = args
 
    def long_func(self, func, args=()):
        self._lf = func
        self._la = args
 
    def _enable_clk_irq(self, callback=None):
        self._pin_clk.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=callback)
 
    def _enable_dt_irq(self, callback=None):
        self._pin_dt.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=callback)
 
    def _disable_clk_irq(self):
        self._pin_clk.irq(handler=None)
 
    def _disable_dt_irq(self):
        self._pin_dt.irq(handler=None)
 
    def _hal_get_clk_value(self):
        return self._pin_clk.value()
 
    def _hal_get_dt_value(self):
        return self._pin_dt.value()
 
    def _hal_enable_irq(self):
        self._enable_clk_irq(self._process_rotary_pins)
        self._enable_dt_irq(self._process_rotary_pins)
 
    def _hal_disable_irq(self):
        self._disable_clk_irq()
        self._disable_dt_irq()
 
    def _hal_close(self):
        self._hal_disable_irq()
 
    def _process_rotary_pins(self, pin):
        old_value = self._value
        clk_dt_pins = (self._hal_get_clk_value() << 1) | self._hal_get_dt_value()
 
        # Determine next state
        self._state = _R_transition_table[self._state & _STATE_MASK][clk_dt_pins]
        direction = self._state & _DIR_MASK
 
        incr = 0
        if direction == _DIR_CW:
            incr = 1
        elif direction == _DIR_CCW:
            incr = -1
 
        incr *= self._reverse
 
        if incr > 0 and self._if:
            # launch(self._if, self._ia)
            micropython.schedule(self._if, self._ia)
        elif incr < 0 and self._ef:
            # launch(self._ef, self._ea)
            micropython.schedule(self._ef, self._ea)
 
    # Current non-debounced logical button state: True == pressed
    def rawstate(self):
        return bool(self.pin.value() ^ self.sense)
 
    # Current debounced state of button (True == pressed)
    def __call__(self):
        return self.btn_state
 
    async def buttoncheck(self):
        t_change = None
        supp = False
        clicks = 0
        lpr = False  # long press ran
        ####
        # local functions for performance improvements
        deb = self.debounce_ms
        dcms = self.double_click_ms
        lpms = self.long_press_ms
        raw = self.rawstate
        ticks_diff = time.ticks_diff
        ticks_ms = time.ticks_ms
        #
        while True:
            btn_state = raw()
            if btn_state is False and self.btn_state is False and self._supp and \
                    ticks_diff(ticks_ms(), t_change) > dcms and clicks > 0 and self._ff:
                clicks = 0
                # launch(self._ff, self._fa)
                micropython.schedule(self._ff, self._fa)
            elif btn_state is True and self.btn_state is True:
                if clicks > 0 and ticks_diff(ticks_ms(), t_change) > dcms:
                    # double click timeout
                    clicks = 0
                if self._lf and lpr is False:  # check long press
                    if ticks_diff(ticks_ms(), t_change) >= lpms:
                        lpr = True
                        clicks = 0
                        if self._supp is True:
                            supp = True
                        # launch(self._lf, self._la)
                        micropython.schedule(self._lf, self._la)
            elif btn_state != self.btn_state:  # state changed
                lpr = False
                self.btn_state = btn_state
                if btn_state is True:  # Button pressed: launch pressed func
                    if ticks_diff(ticks_ms(), t_change) > dcms:
                        clicks = 0
                    if self._df:
                        clicks += 1
                    if clicks == 2:  # double click
                        clicks = 0
                        if self._supp is True:
                            supp = True
                        # launch(self._df, self._da)
                        micropython.schedule(self._df, self._da)
                    elif self._tf:
                        # launch(self._tf, self._ta)
                        micropython.schedule(self._tf, self._ta)
                else:  # Button released. launch release func
                    if supp is True:
                        supp = False
                    elif clicks and self._supp > 0:
                        pass
                    elif self._ff:  # not after a long press with suppress
                        # launch(self._ff, self._fa)
                        micropython.schedule(self._ff, self._fa)
                t_change = ticks_ms()
            # Ignore state changes until switch has settled
            await asyncio.sleep_ms(deb)

Example

import sys
import time
from machine import Pin, PWM
import uasyncio as asyncio
from libs.ky040 import ky040
 
time.sleep(3)
 
def map(x, in_min=0, in_max=180, out_min=25, out_max=115):
    return int((x - in_min) * (out_max - out_min) /
                (in_max - in_min) + out_min)
 
 
# example of a class that uses one rotary encoder
class Application1():
    def __init__(self, r1):
        self.r1 = r1
        self.myevent = asyncio.Event()
        asyncio.create_task(self.action())
        r1.add_listener(self.callback)
 
    def callback(self):
        self.myevent.set()
 
    async def action(self):
        while True:
            await self.myevent.wait()
            print('App 1:  rotary = {}'. format(self.r1.value()))
            servo.duty(map(self.r1.value() * 10))
            # do something with the encoder result ...
            self.myevent.clear()
 
 
 
rot = ky040(btn_num = 17, clk_num=13, dt_num=12, rotary_reverse=True)
print("Starting rotary encoder")
 
 
async def main():
    short_press = rot.release_func(print, ("SHORT",))
    double_press = rot.double_func(print, ("DOUBLE",))
    long_press = rot.long_func(print, ("LONG",))
    incr = rot.increment_func(print, ("INCR",))
    decr = rot.decrement_func(print, ("DECR",))
 
    # create tasks that use the rotary encoders
    app1 = Application1(rot)
    while True:
        await asyncio.sleep(1)
 
asyncio.run(main())