Blame view
gw_RPi3/python/gateway_http_downlink.py
2.87 KB
7e6be9319
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Tue Jun 8 16:03:39 2021 @author: Georges de Massol """ import grpc from chirpstack_api.as_pb.external import api class Downlink: def __init__(self, server = "localhost:8080", api_token = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhcGlfa2V5X2lkIjoiYzA2ZGY1MDMtNjE5Zi00ZjI2LTgxNzEtYTU0OTRmMWJmYmRmIiwiYXVkIjoiYXMiLCJpc3MiOiJhcyIsIm5iZiI6MTYyMjY1MzMzMSwic3ViIjoiYXBpX2tleSJ9.23eLyvgd5zheP9hDM0acCAl9ojhQjLTZAU77IKqhvQY'): """ Parameters ---------- server : string, optional Chirpstack server adress, with port. The default is "localhost:8080". api_token : TYPE, optional Chirpstack API token. Returns ------- None. """ # This must point to the API interface. self.server = server # The DevEUI for which you want to enqueue the downlink. #dev_eui = bytes([0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01]) # The API token (retrieved using the web-interface). self.api_token = api_token def Lorasend(self, dev_eui, data): """ Parameters ---------- dev_eui : hex The DevEUI for which you want to enqueue the downlink. data : bytes Data to be sent to the endpoint. Returns ------- fCnt : int Downlink frame-counter value. """ # Connect without using TLS. channel = grpc.insecure_channel(self.server) # Device-queue API client. client = api.DeviceQueueServiceStub(channel) # Define the API key meta-data. auth_token = [("authorization", "Bearer %s" % self.api_token)] # Construct request. req = api.EnqueueDeviceQueueItemRequest() req.device_queue_item.confirmed = False req.device_queue_item.data = bytes(data) req.device_queue_item.dev_eui = dev_eui req.device_queue_item.f_port = 10 resp = client.Enqueue(req, metadata=auth_token) return resp.f_cnt def FlushQueue(self, dev_eui): """ Flush the downlink queue of an endpoint. Parameters ---------- dev_eui : hex Endpoint EUI. """ # Connect without using TLS. channel = grpc.insecure_channel(self.server) # Device-queue API client. client = api.DeviceQueueServiceStub(channel) # Define the API key meta-data. auth_token = [("authorization", "Bearer %s" % self.api_token)] req = api.FlushDeviceQueueRequest() req.dev_eui = dev_eui client.Flush(req, metadata=auth_token) |