gateway_http_downlink.py 2.87 KB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Jun  8 16:03:39 2021

@author: Georges de Massol
"""
import grpc
from chirpstack_api.as_pb.external import api

class Downlink:
    def __init__(self, server = "localhost:8080", api_token = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhcGlfa2V5X2lkIjoiYzA2ZGY1MDMtNjE5Zi00ZjI2LTgxNzEtYTU0OTRmMWJmYmRmIiwiYXVkIjoiYXMiLCJpc3MiOiJhcyIsIm5iZiI6MTYyMjY1MzMzMSwic3ViIjoiYXBpX2tleSJ9.23eLyvgd5zheP9hDM0acCAl9ojhQjLTZAU77IKqhvQY'):
        """

        Parameters
        ----------
        server : string, optional
            Chirpstack server adress, with port. The default is "localhost:8080".
        api_token : TYPE, optional
            Chirpstack API token.

        Returns
        -------
        None.

        """
        # This must point to the API interface.
        self.server = server
        
        # The DevEUI for which you want to enqueue the downlink.
        #dev_eui = bytes([0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01])
        
        # The API token (retrieved using the web-interface).
        self.api_token = api_token
        
    def Lorasend(self, dev_eui, data):
        """

        Parameters
        ----------
        dev_eui : hex
            The DevEUI for which you want to enqueue the downlink.
        data : bytes
            Data to be sent to the endpoint.

        Returns
        -------
        fCnt : int
            Downlink frame-counter value.

        """
        # Connect without using TLS.
        channel = grpc.insecure_channel(self.server)
        
        # Device-queue API client.
        client = api.DeviceQueueServiceStub(channel)
                
        # Define the API key meta-data.
        auth_token = [("authorization", "Bearer %s" % self.api_token)]
                
        # Construct request.
        req = api.EnqueueDeviceQueueItemRequest()
        req.device_queue_item.confirmed = False
        req.device_queue_item.data = bytes(data)
        req.device_queue_item.dev_eui = dev_eui
        req.device_queue_item.f_port = 10
        
        resp = client.Enqueue(req, metadata=auth_token)
                
        return resp.f_cnt
    def FlushQueue(self, dev_eui):
        """
        Flush the downlink queue of an endpoint.

        Parameters
        ----------
        dev_eui : hex
            Endpoint EUI.

        """
        # Connect without using TLS.
        channel = grpc.insecure_channel(self.server)
        
        # Device-queue API client.
        client = api.DeviceQueueServiceStub(channel)
                
        # Define the API key meta-data.
        auth_token = [("authorization", "Bearer %s" % self.api_token)]
        
        
        req = api.FlushDeviceQueueRequest()
        req.dev_eui = dev_eui
        
        client.Flush(req, metadata=auth_token)