1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
| import cv2 import os
import serial from serial.tools import list_ports import time import pyrealsense2 as rs import numpy as np
class FilaWall: screenWidth = 5 screenHeight = 4 pixelWidth = screenWidth * 4 pixelHeight = screenHeight * 4 pipeline = None ser = None fps = 30
def init_slave(self): availables = ['---:Select one below:---'] for device in list_ports.comports(): availables.append(device.device) id = 0; target_port = 0 for port in availables: print(id, port) id += 1 if len(availables) > 2: target_port = int(input('\nChoose device for FilaWall > ')) if target_port < 1 or target_port >= len(availables): print('Fatal: invalid port.\n\nTask failed successfully.') exit(-1) elif len(availables) == 2: print('\nChoosing default device [1].') target_port = 1 else: print('Fatal: no available port.\n\nTask failed successfully.') exit(-1) os.system("sudo chmod 777 {}".format(availables[target_port])) self.ser = serial.Serial(availables[target_port], 115200, timeout=0.5) time.sleep(2) print("Serial port {} opened".format(target_port))
def _sendCmd(self, cmd: int): byte_data = bytearray() byte_data.append(cmd) self.ser.write(byte_data)
def _send_seq(self, seq: list): self._sendCmd((seq[0] << 0) + (seq[1] << 1) + (seq[2] << 2) + (seq[3] << 3))
def init_realsense(self): self.pipeline = rs.pipeline() config = rs.config() config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, self.fps) self.pipeline.start(config)
def _displayUnit(self, unitRow, unitCol, image): for row in range(unitRow * 4 + 4 - 1, unitRow * 4 - 1, -1): seq = [] for col in range(unitCol * 4 + 4 - 1, unitCol * 4 - 1, -1): seq.append(int(image[row][col] > 0)) self._send_seq(seq)
def start_running(self): while True: frames = self.pipeline.wait_for_frames() depth_rs = frames.get_depth_frame() depth = np.asanyarray(depth_rs.get_data()) dimg_gray = cv2.convertScaleAbs(depth, alpha=255/4000) dimg_gray = cv2.flip(dimg_gray, 1) gray_part = dimg_gray[0:480, 10:610] gray_blur = cv2.blur(gray_part, (30, 30)) gray_rescaled = cv2.resize(gray_blur, (self.pixelWidth, self.pixelHeight)) ret, gray_thresh = cv2.threshold(gray_rescaled, 100, 255, cv2.THRESH_BINARY_INV)
gray_rescaled_show = cv2.resize(gray_thresh, (600, 480), interpolation=cv2.INTER_AREA) color_depth = cv2.applyColorMap(gray_part, cv2.COLORMAP_JET) cv2.imshow('Color Depth', color_depth) cv2.imshow('Gray Blur', gray_blur) cv2.imshow('Gray Rescaled', gray_rescaled_show) for unitRow in range(self.screenHeight): unitColIter = (range(self.screenWidth) if ((unitRow + self.screenHeight) % 2 == 0) else range(self.screenWidth - 1, -1, -1)) for unitCol in unitColIter: self._displayUnit(unitRow, unitCol, gray_thresh) self._sendCmd(0x10) if cv2.waitKey(1) == ord('q'): break cv2.destroyAllWindows()
|