Brandmeister Talk Group status on E-Ink display


1. Problem description

If you are interested in Brandmeister monitoring. Like Talk Group of your interest. You might end up using some kind of view to see activity log while you were away. Pi-Star has its own Last Heard view. Which has its own pros and cons. One of the most critical for me is: it will show only latest call if the same callsign was doing several or had a conversation. You can also use Brandmeister’s Last Heard page. But it has no decent history and works while browser is open.

2. Solution variant

I ended up with following requirements for the solution:

  • should be a separate display (separate from Nextion - which is used for situational awareness)
  • should be tunable to groups/persons of interest (showing only their activity)
  • should have adjustable brightness (need to minimize it during the night)
  • should be autonomous or connectable to existing Pi-Star setup

I’ve chosen WaveShare E-Ink (e-paper) display as a display

  • no led backlight at all (shouldn’t be affected color/brightness change over time)
  • low consumption (only to redraw the picture)
  • 17 seconds to refresh (not an issue for historical data)
  • can be run separately with Raspberry Pi or connected to Pi-Star board (if your Rx/Tx module is connected via USB)
  • has libraries for Python

One of the cons: couldn’t compile libraries for ESP32. So have Raspberry Pi solution only. Another one is that after each reconnect to Brandmeister stream history on display will be flushed.

3. Display layout

Tried several approached to maximize amount of information displayed. Here is one of the latest designs:

Display layout

  • 05/27/2022, 16:35:49 Show date and time of the latest update
  • 16:35 time of the call
  • C symbol which can help to identify Talk Group if you are monitoring several of them
  • AG6PF operator callsign
  • Tim J operator name

Display can show up to 12 Last heard records

In my environment Talk Group symbol is:

  • absent for private calls
  • C for ColoradoHD Talk Group
  • H for Hytera Talk Group

So you can identify the origin of the call also.

4. Program

You need to install waveshare_epd library using manufacturer documentation. It goes with required font files.

Additional Python requirements:

socketIO-client
json
python-dateutil

Program connects to Branmeister last heard stream. Parses event with simple set of filters. Additional conditions were added to exclude duplicate records. Brandmeister treats start and end of TX as a separate event. Sometimes events ‘from the past’ might appear in the queue. One more filter with time check is implemented to filter such cases.

Filtered events are getting into double-ended queue. Display refresh is triggered after record is added to queue.

  • 1234567 your callsign
  • 31088 TG or personal ID to monitor

Listing:

import socketio
import json
from collections import deque
import _thread
from waveshare_epd import epd4in2
from PIL import Image, ImageDraw, ImageFont
from random import randrange
from time import sleep
from datetime import datetime, timedelta
from dateutil import tz

dq_id = deque(maxlen=12)
dq_str = deque(maxlen=12)

saved_id = ""

sio = socketio.Client()

def draw_on_display():
    global saved_id
    print("Thread started")
    epd = epd4in2.EPD()

    font24 = ImageFont.truetype("Font.ttc", 24)
    # font18 = ImageFont.truetype("Font.ttc", 18)

    epd.init()

    while True:
        # if len(dq_id) > 0 and saved_id == "":
        #     saved_id = dq_id[0]
        sleep(0.02)
        if len(dq_id) > 0:
            if dq_id[len(dq_id) - 1] != saved_id:
                # print("Start drawing.")
                # print(datetime.now())
                saved_id = dq_id[len(dq_id) - 1]

                epd.Clear()

                Limage = Image.new(
                    "1", (epd.height, epd.width), 255
                )  # 255: clear the frame
                draw = ImageDraw.Draw(Limage)
                draw.text(
                    (2 + randrange(8), 0),
                    datetime.now().strftime("%m/%d/%Y, %H:%M:%S"),
                    font=font24,
                    fill=0,
                )

                for i in range(len(dq_id) - 1, -1, -1):
                    draw.text(
                        (2, 7 + 30 * (len(dq_id) - i)), dq_str[i], font=font24, fill=0,
                    )

                # epd.display(epd.getbuffer(Limage.rotate(180)))
                epd.display(epd.getbuffer(Limage))
                # epd.sleep()
                # Sleep is required by documentation but not working properly in my setup


                # print("End drawing.")
                # print(datetime.now())

@sio.event
def connect():
    print('connected to server')
    
@sio.event
def disconnect():
    print('disconnected from server')
    
@sio.on("mqtt")
def on_mqtt(data):
    # print (data)
    global last_message_id
    call = json.loads(data['payload'])
    # Filter to include only selected TG and private calls to your DMR ID, filters out your TXs and event end(stop) state
    if (call["DestinationID"] == 31088 or call["DestinationID"] == 1234567) and call["SourceID"] != 1234567 and last_message_id != call["SessionID"] and call["Stop"] != 0:
        group_id=''
        # Marking different sources with symbols
        if call["DestinationID"] == 31088:
            group_id='C'
        # if call["DestinationID"] == 1234567:
        #     group_id='!'
        date = datetime.fromtimestamp(call["Start"])
        # date = datetime.fromisoformat(call["Created"].replace('Z', '+00:00'))
        date_local = date.astimezone(tz.tzlocal())
        if (date > datetime.now()-timedelta(minutes=5)) and (call["SessionID"] not in dq_id): # to exclude messages from the past
            print(json.dumps(call,separators=(',',':'),sort_keys=True,indent=4))
            dq_id.append(call["SessionID"])
            if call["SourceName"]: # workaround to avoid empty names for YSF calls
                callsign_display_string = call["SourceCall"] + ' ' + call["SourceName"]
            elif call["TalkerAlias"]:
                callsign_display_string = call["TalkerAlias"]
            else:
                callsign_display_string = call["SourceID"]
            dq_str.append(date_local.strftime("%H") + ':' + date_local.strftime("%M") + ' ' + group_id + ' ' + str(callsign_display_string))
        last_message_id = call["SessionID"]
    return


_thread.start_new_thread(draw_on_display, ())
last_message_id = ''
sio.connect(url='https://api.brandmeister.network', socketio_path="/lh/socket.io", transports="websocket")
sio.wait()

Can be orchestrated but script with infinite loop, service, supervisor-like software. Have used the simplest solution with script:

#!/bin/bash
while true; do
python3.7 bm-api-new-socket.py
sleep 10
done

Not so elegant from error handling perspective (when Brandmeister drops the connection) but works 100% of the time.

Information sources list