(파이썬/쓰레드/YDLidar G2/시각화) 잃어버린 패킷을 찾아서
앞서 작성한 스캔 시각화 코드에서,
read_data = s.read(2500)
으로 2500바이트씩 읽어오는데, 이러면 마지막 패킷이 잘려서 버려집니다.
먼저, 이렇게 데이터를 여러번 읽어와서 패킷이 잘려나가는지 확인해보죠.
import serial
import time
G2_MODE = True
X2_MODE = False
G2_BAUDRATE = 230400
X2_BAUDRATE = 115200
DATA_SIZE = 2500
READY_TIME = 7
HOW_MANY_FILES = 4 # 연속 패킷을 잘라서 저장할 파일 갯수
FILENAME_PREFIX = "file"
if G2_MODE == True:
s = serial.Serial('/dev/ttyUSB0', G2_BAUDRATE)
s.write(b'\xa5\x60') # start scanning, output point cloud data
elif X2_MODE == True:
s = serial.Serial('/dev/ttyUSB0', X2_BAUDRATE)
time.sleep(READY_TIME) # the lidar needs some time to spin motor and to start the scan
for i in range(HOW_MANY_FILES):
read_data = s.read(DATA_SIZE)
print("len_read_data: ", len(read_data))
filename = FILENAME_PREFIX + f"_{i}"
with open(filename, "wb") as fs:
fs.write(read_data)
if G2_MODE == True:
s.write(b'\xa5\x65') # stop motor
s.close()
G2를 연결하고 이 녀석을 실행시켜보면,
이진파일 4개가 생성됩니다.
file0 ... file3
hexedit로 조사를 해보면 답이 나오죠.
첫번째 데이터: file0
A5 5A로 시작하는 응답 데이터 8바이트로 시작합니다. 뒤이어 패킷이 AA 55라는 패킷 헤더(PH)가 나오면서 모두 13바이트가 됩니다. 헤더 응답 10바이트, 3바이트 스캔값(보통은 distance, intensity 값이 담긴 3바이트, X2나 X4는 distance만 담긴 2바이트임)인데 스캔값이 달랑 3바이트라는 것은 스캔 데이터가 없다는 표시죠. 그뒤로 AA 55 패킷 헤더가 한번 더 나오지만, 계속 00만 지속될 뿐 스캔 데이터는 들어오지 않습니다. 라이더가 막 회전을 시작하면서 아직 스캔 데이터가 없는 상황이지요. 몇 초 지나면 데이터가 들어오기 시작합니다. ㅎㅎ
이것은 2500바이트 데이터의 마지막 부분인데요, 커서가 있는 AA 55가 패킷 헤더지만, 왠지 패킷이 짧다는 느낌을 주는데, 끊겼다는 느낌적인 느낌이 들지요. 보통 (S1, S2, S3)가 40개, 즉 120바이트에 헤더 정보 10바이트 해서 130바이트가 표준적이라고 볼 수 있습니다.
그럼, 두 번째 2500바이트를 들여다 보지요.
file1: 두 번째 2500바이트의 시작부분과 끝부분
시작하고 20바이트 뒤에 다시 패킷의 헤더 AA 55가 나오네요. 패킷이 file0과 file1로 잘렸다는 사실이 확실하죠. file0과 file1에 걸린 총 48바이트의 순 스캔 정보(응답 헤더를 뺀)가 저의 처리 코드에서 제외되었는데 48/3 하면 16개 각도, 즉 0.5*16 = 8도 에 해당하는 정보가 사라졌습니다. 패킷이 40인 것이 잘렸다면 40*0.5 = 20도... 가 분실되는 거지요. 물론 정확하게 720개(360도)가 아닌 740개 스캔 각도를 점을 1초에 8~12회 찍다 보니, 1초쯤 지나면 분실된 각도도 시작적으로 회복되기야 하겠지요. 하지만 이런 분실 데이터를 그냥 둔다는 것은 뭔가 좀 아쉽지요. 그래서 먼저 이 데이터를 1바이트 단위로 받아서 패킷 헤더 기준으로 잘라서 보관하는 루틴을 먼저 짜보죠.
생각해보니 1바이트 단위로 시리얼에서 읽어온다는 것은 비효율적이고, 기존 방식대로 하되, 마지막 패킷의 자투리를 remained에 보관을 해서, 다음 데이터를 읽어올 때 그 제일 처음에 끼워넣는 방식으로 하는 게 좋겠네요.
is_remained가 1이라면(전에 패킷 자투리가 있다면), new_read_data = remained + read_data
혹시 read_data가 지속적으로 버려져 메모리 누수가 발생한다면,
import gc
del read_data
gc.collect()
하여 쓰레기줍기를 하면 되겠네요.
이렇게 하면 기존 골격은 거의 건드리지 않고, remained가 있는지 체크하고, 있으면 remained를, 읽어온 데이터 제일 앞에 삽입하는 것으로 끝날 수 있겠네요. 자, 한번 해보지요.
먼저 전역변수로
remained = bytes()
로 선언하고, 스레드 함수인 scan_values 안에서 global 로 선언합니다.
global remained
2500바이트 정도 읽어온 데이터를 이제 달라진 변수명에 저장합니다.
o_read_data = s.read(read_size) # original
읽어온 데이터에서 패킷 헤더인 0xaa 0x55를 찾아서 인덱스를 모조리 기록합니다.
indexes = [m.start() for m in re.finditer(b"\xaa\x55", o_read_data)]
count = len(indexes)
len_read = len(o_read_data)
여기까지는 딱히 달라진 게 없는데, 이제...
read_data = remained + o_read_data[indexes[0]:indexes[count-1]]
remained = o_read_data[indexes[count-1]:] # 마지막의 잘린 패킷
읽어온 데이터에서 마지막 패킷 자투리를 제외하고 모두 읽어오고, 이 앞에 remained를 갖다붙입니다. 처음에는 전역 공간에서 빈 바이트를 생성했으므로, 추가되는 바이트는 없습니다. 그저 읽어온 바이트 중 제일 뒤의 쪼개진 패킷을 제외하고 붙이는 거지요.
마지막의 패킷 자투리는 remained에 저장합니다. 이 내용이 이제, 다음 차례에 앞부분에 끼워넣어집니다. 이렇게 해서 항상 read_data는 첫 패킷도, 마지막 패킷도, 토막나지 않은 완전 상태를 이룹니다.
데이터 배치가 달라졌으므로, 인덱싱과 길이 세팅을 다시 합니다.
indexes = [m.start() for m in re.finditer(b"\xaa\x55", read_data)]
count = len(indexes)
len_read = len(read_data)
이제 원래의 o_read_data 는 쓸모가 없어져서 가비지 콜렉터로 지워봅니다.
import gc
del o_read_data
gc.collect()
테스트를 해보니, 쓰레기 수집을 하나 안 하나 프로세스 메모리 사용량에는 변함이 없네요. 다음 차례에 o_read_data에, 읽어온 데이터를 할당할 때 기존 메모리는 자동으로 해제(free)되나 봅니다. 세상 좋아졌네요. 옛날에 C 언어 사용할 때는 메모리 누수를 막느라 전쟁이었는데요...ㅎㅎ
변수 선언까지 합쳐 총 7줄만에 목적을 달성했습니다.
왼쪽의 녹색원이 G2 라이다이고, 그 앞 D는 스피커바, A는 이 그림을 캡쳐하느라 마우스 버턴을 누리는 저의 오른팔이고, B는 저의 불룩한 배고, C는 팔걸이에 걸친 저의 손목 부위입니다. 이제 G2의 기본 기능인 intensity가 뭔지 좀 알아봐야겠군요.
# YDLidar G2 시각화 코드 v2.
# 1. serial.read()로 읽어왔을 때 끝에 잘리는 패킷 짜투리를,
# 뒤에 오는 패킷 짜투리와 이어 붙여서 활용하는 코드 추가
# 2. frequency 설정 코드 추가
import serial
import re
import math
from matplotlib import pyplot as plt
import numpy as np
import cv2
import time
import threading
import gc # 가베지 콜렉터
#----------------------------------------------------
# check code로 데이터 무결성 검토
# 입력값: 해당 패킷, 출력값: False(결점 있음), True(결점 없음)
#----------------------------------------------------
def check_code(data):
len_data = len(data)
lsn = data[3]
fsa1 = data[4] # LSB : start angle
fsa2 = data[5] # MSB : start angle
lsa1 = data[6] # LSB : end angle
lsa2 = data[7] # MSB : end angle
cs = data[8] | (data[9] << 8)
ph = data[0] | (data[1] << 8) # Packet header, G2에서는 0x55AA임
# XOR 연산 시작
tmp_cs = ph ^ (data[2] | (data[3] << 8)) # (1) ct(f&c) 와 lsn
tmp_cs = tmp_cs ^ (fsa1 | (fsa2 << 8)) # (2)
tmp_cs = tmp_cs ^ (lsa1 | (lsa2 << 8)) # (3)
# 거리 및 밝기 정보와 XOR 연산
for n in range(0, lsn):
if 10+3*n+2 >= len_data: # 아래에서 발생할 수 있는 인덱싱 에러 방지. 인덱스가 데이터 길이를 벗어나지 않도록 체크
print("check code: out of index")
break
tmp_cs ^= data[10+3*n] # (4)
tmp_cs ^= (data[10+3*n+1] | data[10+3*n+2] << 8) # (5)
print("cs = ", hex(cs))
print("xor result = ", hex(tmp_cs))
if cs == tmp_cs:
#print("데이터에 결함이 없습니다.")
return True
else:
#print("데이터에 결함이 있습니다.")
return False
#----------------------------------------------------------------
# 각도 구하는 함수들
# ---------------------------------------------------------------
# 맨처음으로, start_angle과 end_angle을 구한다. 이 둘을 구하는 공식은 같다.
def first_level_angle(lsb, msb):
return ((lsb | (msb << 8)) >> 1) / 64.0
# end_angle 과 start_angle의 차이를 구한다.
# end_angle 이 360도를 넘어가서 1부터 시작하면 360을 더해서, 거기서 start_angle을 빼서 차이를 구한다.
def diff_angle(end_angle, start_angle):
if end_angle < start_angle: # end_angle이 360도를 넘어 1도로 되돌아간 상황
end_angle += 360
return end_angle - start_angle
# start_angle 과 end_angle 사이의 angle들을 구한다.
# diff_angle 을 lsn-1로 나누면 하나의 간격(약 0.5도) angle이 나온다. 이 angle*idx를 start_angle에 더한다.
# 360도를 넘어가면 360을 뺀다.
def inter_angle(idx, diff_angle, lsn, start_angle):
ret = (diff_angle / (lsn - 1)) * (idx - 1) + start_angle
if ret >= 360:
ret -= 360
return ret
# 개발자 가이드 나온대로 따라한 수정 앵글 value
def angle_correct(angle, distance):
if distance == 0:
return 0.0
#return angle + (math.atan(21.8 * ((155.3 - distance)/(155.3*distance))) * 180/math.pi) # OK
#return angle + (math.atan2(21.8 * (155.3 - distance), 155.3 * distance) * 180/math.pi) # OK
return angle + math.degrees(math.atan2(21.8 * (155.3 - distance), 155.3 * distance))
# 거리와 각도로부터 점의 좌표 구해서 math.atan2에 집어넣기 : 각도를 degree단위로 바꿀 필요성
# math.degrees(): degree로 변환, math.radians(): 라디안으로 변환
#----------------------------------------------------------------
# 패킷을 분석해서 xy 좌표값, 거리를 구하는 핵심 함수
# ---------------------------------------------------------------
def scan_values(read_size, MIN_LENGTH):
global xys
global remained
s = serial.Serial('/dev/ttyUSB0', 230400)
#for i in range(6):
# s.write(b'\xa5\x0c') # increase the frequency by 1Hz
s.write(b'\xa5\x60') # start scanning, output point cloud data
while True:
xs = [] # 1회 read_size해서 구한 x 좌표값들을 저장하는 리스트
ys = [] # 1회 read_size해서 구한 y 좌표값들을 저장하는 리스트
o_read_data = s.read(read_size) # original
# parse response header
#idx = o_read_data.find(b'\xa5\x5a') # search for starting point
#response_mode = o_read_data[5] >> 6
indexes = [m.start() for m in re.finditer(b"\xaa\x55", o_read_data)]
count = len(indexes)
len_read = len(o_read_data)
print("indexes: ", indexes)
print("len(indexes)", count)
print("len_read: ", len_read)
read_data = remained + o_read_data[indexes[0]:indexes[count-1]]
remained = o_read_data[indexes[count-1]:] # 마지막의 잘린 패킷
# o_read_data 변수에 다음에 다시 할당될 때, 기존에 할당된 메모리 영역은 자동으로 가베지 셀릭트 되는 듯.
#del o_read_data
#gc.collect()
# 데이터가 구조가 달라졌으니 다시 세팅
indexes = [m.start() for m in re.finditer(b"\xaa\x55", read_data)]
count = len(indexes)
len_read = len(read_data)
print("indexes: ", indexes)
print("len(indexes)", count)
print("len_read: ", len_read)
for i in range(count):
if i == count - 1: # 마지막 패킷이라면, end 인덱스가 없으므로 끝지점을 직접 지정
data = read_data[indexes[i] : len_read]
#print("last packet length : ", len(data))
else :
data = read_data[indexes[i] : indexes[i+1]] # header부터 다음 header 직전까지 읽음
len_data = len(data)
#print("len_data: ", len_data)
if len_data <= MIN_LENGTH:
continue
frequency = (data[2] >> 1) / 10.0 # current frequency
#print("current frequency: ", frequency)
packet_type = data[2] & 0b01 # 하위 1비트만 얻음
#print("current packet type: ", packet_type)
lsn = data[3] # sample quantity
#print("-------")
print("lsn(sample quantity): ", lsn)
if lsn != 1: # 한 패킷 안의 샘플링 데이터 갯수. 없으면 lsn=1
# 앵글 계산 end_angle , start_angle
fsa1 = data[4] # LSB : start angle
fsa2 = data[5] # MSB : start angle
lsa1 = data[6] # LSB : end angle
lsa2 = data[7] # MSB : end angle
#check code 해서 무결성 검사
if check_code(data) == True:
print("데이터에 결함이 없습니다.")
else:
print("데이터에 결함이 있습니다.")
start_angle = first_level_angle(fsa1, fsa2)
end_angle = first_level_angle(lsa1, lsa2)
diff_angle_ = diff_angle(end_angle, start_angle)
print("start_angle: ", start_angle)
print("end_angle: ", end_angle)
print("diff_angle: ", diff_angle_)
print("step angle: ", diff_angle_/(lsn-1))
for j in range(0, lsn):
if 10+3*j+2 >= len_data: # 아래에서 발생할 수 있는 인덱싱 에러 방지. 인덱스가 데이터 길이를 벗어나지 않도록 체크
break
# --------- Luminous intensity 계산
# 첫 번째 바이트를 전부 취하고 (최대 256 표현), 두 번째 바이트의 하위 2개 비트에 256을 곱해서(= 왼쪽으로 8번 비트 쉬프트해서),
# 이 둘을 더해줌. 2^8은 256이므로, 해당 비트들을 왼쪽으로 8번 쉬프트하는 것이나, 256을 곱하는 것이나 동일함.
# 0b11은 이진수 표현. 16진수로 표현하자면 0x03이 됨.
# 표현 범위: 0~1023
intensity = data[10+3*j] + (data[10+3*j+1] & 0b11) * 256 # 두 번째 바이트 하위 2비트 얻어서 256을 곱해서 첫번째 바이트와 더함
print("Luminous intensity: ", intensity)
# -------- 거리 계산
# 두 번째 바이트 상위 6비트를 밑으로 내리고, 세번째 바이트 비트 전부를 여섯 비트(계단) 올려서 이 둘을 비트 조합(OR 연산)함.
distance = ((data[10+3*j+1] >> 2) | (data[10+3*j+2] << 6))
print("distance: ", distance) # 단위 mm
# Intermediate angle solution formula (중간 각도 구하기). 코멘트는 메뉴얼에 나온 공식대로. 메뉴얼 설명이 부실함.
if j > 0 and j < lsn-1: # start_angle 과 end_angle 가급적 건드리지 않기
inter_angle_ = inter_angle(j, diff_angle_, lsn, start_angle)
elif j == 0:
inter_angle_ = start_angle
elif j == lsn - 1:
inter_angle_ = end_angle
angle_correct_ = angle_correct(inter_angle_, distance)
# 좌표 구하기 pos_x, pos_y
pos_x = distance * math.cos(math.radians(angle_correct_))
#pos_x = distance * math.cos(inter_angle_*(math.pi/180))
# plot으로 그릴 때는 아래 값에 -1을 곱해야 했었는데
# opencv로 출력할 때는 곱하지 않아도 vertical_flip(상하반전) 일어나지 않음.
pos_y = distance * math.sin(math.radians(angle_correct_))
print(f"(x, y) : ({pos_x}, {pos_y})")
# 리스트에 추가하기
xs.append(pos_x)
ys.append(pos_y)
xys.append([xs, ys])
print("xs: ", xs[:10])
print("ys: ", ys[:10])
print("len_xys", len(xys))
#return xys
s.write(b'\xa5\x65') # stop motor
s.close()
#----------------------------------------------------------------
# 시각화 함수: 분석값을 받아와서 좌표평면에 점을 찍는 rviz 비슷하게.
# ---------------------------------------------------------------
def my_viz(WIDTH=800, HEIGHT=800, INTER=100):
# 좌표 평면의 너비와 폭
#WIDTH = 800
#HEIGHT = 800
# 배경색, 격자 간격, 격자의 색깔
background_color = 70
#background_color = (90, 79, 74) # 어두운 회색
grid_color = (100, 100, 100)
grid_thickness = 1
#INTER = 100 # 격자 간격
row_numbers = int(HEIGHT / 100)
column_numbers = int(WIDTH / 100)
while True:
# 좌표 평면 생성
img = np.full((HEIGHT, WIDTH, 3), background_color, dtype=np.uint8)
# 세로선 그리기
for i in range(1, column_numbers):
cv2.line(img, (i * INTER, 0), (i * INTER, HEIGHT), grid_color, grid_thickness, cv2.LINE_AA)
# 가로선 그리기
for i in range(1, row_numbers):
cv2.line(img, (0, i * INTER), (WIDTH, i * INTER), grid_color, grid_thickness, cv2.LINE_AA)
# 좌표 중심(0, 0) 잡기. (x, y) 좌표값들의 분포를 구해야 함. 일단, 정가운데로 잡기. 가령 (400, 400)
center = (WIDTH // 2, HEIGHT // 2)
# 중심점에 빨간색 원 그리기
center_color = (0, 255, 0)
center_radius = 5
cv2.circle(img, center, center_radius, center_color, cv2.FILLED, cv2.LINE_AA)
# --------------------------------
# 평면에 xy 점 찍기
# --------------------------------
# mm로 표현된 xys 값을 좌표값 단위(pixel = cm 단위)로 바꿈 => 나누기(//) 10 (cv2 출력에서는 소수점 안 됨)
# (-400, -400)(400, 400) 스케일을 (0, 0)(800, 800) 스케일로 바꿈 => 좌표값에 400을 더함
xy_color = (0, 0, 255)
xy_radius = 1
print("from thread2: len_list_numbers = ", len(xys))
if (len(xys) > 0):
xy = xys.pop(0) # list type
# 리스트에는 연산 연력이 없으므로 넘파이 배열로 바꿔서 연산함
x = np.array(xy[0], dtype=np.int16) // 10 + 400
y = np.array(xy[1], dtype=np.int16) // 10 + 400
print("-------------------len(x) : ", len(x))
for i in range(len(x)):
cv2.circle(img, (x[i], y[i]), xy_radius, xy_color, cv2.FILLED, cv2.LINE_AA)
cv2.imshow('img', img)
if cv2.waitKey(1) == ord('q'):
break
time.sleep(0.04) # 초당 24프레임
'''
def plot_data(x, y):
plt.cla()
plt.ylim(-4000, 4000)
plt.xlim(-4000, 4000)
plt.scatter(x, y, s=1**2)
plt.axis('square')
plt.show()
'''
#----------------------- main ------------------------
#s = serial.Serial('/dev/ttyUSB0', 230400)
#s.write(b'\xa5\x60') # start scanning, output point cloud data
# x, y 좌표값을 담는 전역변수. 쓰레드들이 사용해야 하기 때문에 전역으로 뺐음.
# [[x1_values, y1_values], [x2_values, y2_values], ...]
# 제일 앞에 저장된 원소를 pop()해서 opencv로 그림.
xys = []
#read_size = 6000 # 한번에 읽어오는 데이터 길이
MIN_LENGTH = 13 # 한 패킷의 최소 길이. 헤더 정보 10바이트와 점 한 개 3바이트 해서 총 13바이트에 못 미치면 해당 패킷은 건너뛴다.
#header_size = 7 # response 헤드 bytes
#cloud_header = 10 # 클라우드 데이터 헤드 bytes
#point_size = 3 # 점 하나 데이터 bytes (거리 및 루미넌스 값)
remained = bytes() # 패킷의 자투리를 보관하는 바이트 변수.
# 쓰레드 생성
thread_1 = threading.Thread(target=scan_values, args=(2500, MIN_LENGTH))
thread_2 = threading.Thread(target=my_viz, args=(800, 800, 100))
thread_1.start()
thread_2.start()
#s.write(b'\xa5\x65') # stop motor
#s.close()