Mbed Host Tests
conn_primitive_remote.py
Go to the documentation of this file.
1#!/usr/bin/env python
2"""
3mbed SDK
4Copyright (c) 2011-2016 ARM Limited
5
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17"""
18
19from mbed_host_tests import DEFAULT_BAUD_RATE
20from mbed_host_tests.host_tests_conn_proxy.conn_primitive import ConnectorPrimitive
21
22
23class RemoteConnectorPrimitive(ConnectorPrimitive):
24 def __init__(self, name, config, importer=__import__):
25 ConnectorPrimitive.__init__(self, name)
26 self.config = config
27 self.target_id = self.config.get('target_id', None)
28 self.grm_host = config.get('grm_host', None)
29 self.grm_port = int(config.get('grm_port', 8000))
30 self.grm_module = config.get('grm_module', 'unknown')
31 self.platform_name = config.get('platform_name', None)
32 self.baudrate = config.get('baudrate', DEFAULT_BAUD_RATE)
33 self.image_path = config.get('image_path', None)
34 self.allocate_requirements = {"platform_name": self.platform_name}
35
36 if self.config.get("tags"):
37 self.allocate_requirements["tags"] = {}
38 for tag in config["tags"].split(','):
39 self.allocate_requirements["tags"][tag] = True
40
41 # Global Resource Mgr tool-kit
42 self.remote_module = None
44 self.client = None
45
46 # Initialize remote resource manager
47 self.__remote_init(importer)
48
49 def __remote_init(self, importer):
50 """! Initialize DUT using GRM APIs """
51
52 # We want to load global resource manager module by name from command line (switch --grm)
53 try:
54 self.remote_module = importer(self.grm_module)
55 except ImportError as error:
56 self.logger.prn_err("unable to load global resource manager '%s' module!" % self.grm_module)
57 self.logger.prn_err(str(error))
58 self.remote_module = None
59 return False
60
61 self.logger.prn_inf("remote resources initialization: remote(host=%s, port=%s)" %
62 (self.grm_host, self.grm_port))
63
64 # Connect to remote global resource manager
65 self.client = self.remote_module.create(host=self.grm_host, port=self.grm_port)
66
67 # First get the resources
68 resources = self.client.get_resources()
69 self.logger.prn_inf("remote resources count: %d" % len(resources))
70
71 # Query for available resource
72 # Automatic selection and allocation of a resource
73 try:
74 self.selected_resource = self.client.allocate(self.allocate_requirements)
75 except Exception as error:
76 self.logger.prn_err("can't allocate resource: '%s', reason: %s" % (self.platform_name, str(error)))
77 return False
78
79 # Remote DUT connection, flashing and reset...
80 try:
81 self.__remote_flashing(self.image_path, forceflash=True)
82 self.__remote_connect(baudrate=self.baudrate)
83 self.__remote_reset()
84 except Exception as error:
85 self.logger.prn_err(str(error))
86 self.__remote_release()
87 return False
88 return True
89
90 def __remote_connect(self, baudrate=DEFAULT_BAUD_RATE):
91 """! Open remote connection to DUT """
92 self.logger.prn_inf("opening connection to platform at baudrate='%s'" % baudrate)
93 if not self.selected_resource:
94 raise Exception("remote resource not exists!")
95 try:
96 serial_parameters = self.remote_module.SerialParameters(baudrate=baudrate)
97 self.selected_resource.open_connection(parameters=serial_parameters)
98 except Exception:
99 self.logger.prn_inf("open_connection() failed")
100 raise
101
102 def __remote_disconnect(self):
103 if not self.selected_resource:
104 raise Exception("remote resource not exists!")
105 try:
106 if self.connectedconnected():
107 self.selected_resource.close_connection()
108 except Exception as error:
109 self.logger.prn_err("RemoteConnectorPrimitive.disconnect() failed, reason: " + str(error))
110
111 def __remote_reset(self):
112 """! Use GRM remote API to reset DUT """
113 self.logger.prn_inf("remote resources reset...")
114 if not self.selected_resource:
115 raise Exception("remote resource not exists!")
116 try:
117 if self.selected_resource.reset() is False:
118 raise Exception("remote resources reset failed!")
119 except Exception:
120 self.logger.prn_inf("reset() failed")
121 raise
122
123 def __remote_flashing(self, filename, forceflash=False):
124 """! Use GRM remote API to flash DUT """
125 self.logger.prn_inf("remote resources flashing with '%s'..." % filename)
126 if not self.selected_resource:
127 raise Exception("remote resource not exists!")
128 try:
129 if self.selected_resource.flash(filename, forceflash=forceflash) is False:
130 raise Exception("remote resource flashing failed!")
131 except Exception:
132 self.logger.prn_inf("flash() failed")
133 raise
134
135 def read(self, count):
136 """! Read 'count' bytes of data from DUT """
137 if not self.connectedconnected():
138 raise Exception("remote resource not exists!")
139 data = str()
140 try:
141 data = self.selected_resource.read(count)
142 except Exception as error:
143 self.logger.prn_err("RemoteConnectorPrimitive.read(%d): %s" % (count, str(error)))
144 return data
145
146 def write(self, payload, log=False):
147 """! Write 'payload' to DUT """
148 if self.connectedconnected():
149 try:
150 self.selected_resource.write(payload)
151 if log:
152 self.logger.prn_txd(payload)
153 return True
154 except Exception as error:
155 self.LAST_ERRORLAST_ERROR = "remote write error: %s" % str(error)
156 self.logger.prn_err(str(error))
157 return False
158
159 def flush(self):
160 pass
161
162 def allocated(self):
163 return self.remote_module and self.selected_resource and self.selected_resource.is_allocated
164
165 def connected(self):
166 return self.allocated() and self.selected_resource.is_connected
167
168 def __remote_release(self):
169 try:
170 if self.allocated():
171 self.selected_resource.release()
172 self.selected_resource = None
173 except Exception as error:
174 self.logger.prn_err("RemoteConnectorPrimitive.release failed, reason: " + str(error))
175
176 def finish(self):
177 # Finally once we're done with the resource
178 # we disconnect and release the allocation
179 if self.allocated():
181 self.__remote_release()
182
183 def reset(self):
184 self.__remote_reset()
185
186 def __del__(self):
187 self.finishfinish()
def finish(self)
Handle DUT dtor like (close resource) operations here.
def connected(self)
Check if there is a connection to DUT.
def finish(self)
Handle DUT dtor like (close resource) operations here.