В процессе разботы над одной задачей в проекте Карты Mail.ru возникла необходимость чтения формата WKB. Конечно, можно было воспользоваться GDAL, но нужно было только чтение, а все остальные возможности были излишни. Так и родился этот небольшой велосипед.
Хочу сразу предупредить, что функциональность реализована только в необходимых рамках и поддерживаются только базовые типы, такие как:
- точка
- линия
- полигон
- мультилиния
- мультиполигон
Не поддерживается перекодировка порядка следования байт в слове. А так — бери и пользуйся.
Для удобства также пригодится класс-враппер представляющий область памяти как STL-like контейнер; тут даже комментировать ничего не надо. Его можно забрать отсюда. И дополнительные простейшие классы для геометрических примитивов: точки и прямоугольника.
namespace ml {
struct point_d {
double x;
double y;
};
inline std::ostream & operator << (std::ostream &s, point_d const &p) {
return s << p.x << ", " << p.y;
}
inline double distance(point_d const &p1, point_d const &p2) {
double dx = (p1.x-p2.x);
double dy = (p1.y-p2.y);
return sqrt(dx*dx+dy*dy);
}
struct rect_d {
point_d min;
point_d max;
static rect_d void_rect() {
return rect_d(std::numeric_limits<T>::max(),std::numeric_limits<T>::max(),
-std::numeric_limits<T>::max(),-std::numeric_limits<T>::max());
}
bool empty() const {
return (*this == rect_d::void_rect() || (width()==0 && height()==0));
}
inline double height() const {
return fabs(max.y-min.y);
}
inline double width() const {
return fabs(max.x-min.x);
}
rect_d & extend(const rect_d &r) {
if(r.empty())
return *this;
min.x = std::min(min.x,std::min(r.min.x,r.max.x));
min.y = std::min(min.y,std::min(r.min.y,r.max.y));
max.x = std::max(max.x,std::max(r.min.x,r.max.x));
max.y = std::max(max.y,std::max(r.min.y,r.max.y));
return *this;
}
template< typename C >
rect_d & bounds( C begin, C end) {
for(; begin != end; ++begin) {
min.x = std::min(min.x,begin->x);
min.y = std::min(min.y,begin->y);
max.x = std::max(max.x,begin->x);
max.y = std::max(max.y,begin->y);
}
return *this;
}
};
inline std::ostream & operator << (std::ostream &s,const rect_d &r) {
return s << '[' << r.min << ',' << r.max << ']';
}
} // namespace ml
Для использования класса необходимо предварительно загрузить данные в формате WKB в память. Как это будет сделано, пусть останется на усмотрение того, кто будет это использовать.
Использовать написанный класс предполагалось примерно следующим образом:
char *data = ....
ml::wkb shape(data);
std::cout << ml::wkb::type_to_text(shape.type()) << std::endl;
std::cout << shape.bounds() << std::endl;
Первая версия получилась решением в лоб, и выглядела так:
namespace ml {
class wkb {
char const * m_data;
public:
enum type_e{
wkb_point = 1,
wkb_line_string = 2,
wkb_polygon = 3,
wkb_multi_point = 4,
wkb_multi_line_string = 5,
wkb_multi_polygon = 6,
wkb_geometry_collection = 7
};
static char const *type_to_text(type_e type) {
switch (type) {
case wkb_point: return "wkb_point";
case wkb_line_string: return "wkb_line_string";
case wkb_polygon: return "wkb_polygon";
case wkb_multi_point: return "wkb_multi_point";
case wkb_multi_line_string: return "wkb_multi_line_string";
case wkb_multi_polygon: return "wkb_multi_polygon";
case wkb_geometry_collection: return "wkb_geometry_collection";
default: return "unknown";
}
}
typedef container<ml::point_d> line;
wkb(char const *data) : m_data(data) {}
wkb & operator = (wkb const &d) { m_data = d.m_data; return *this; }
inline char byte_order() const { return m_data[0]; }
inline type_e type() const { return (type_e)*(int*)&m_data[1]; }
inline int size() const { return *(int*)&m_data[5]; }
line linestring() const {
return line((line::value_type*)&m_data[9], size());
}
ml::point_d const & point() const {
return *(ml::point_d*)&m_data[5];
}
wkb linestring(size_t index) const {
size_t offset = 9;
for(size_t i=0; i<size(); ++i) {
wkb l(&m_data[offset]);
if (i == index)
return l;
offset += 9 + 16 * l.size();
}
return wkb(0);
}
line ring(size_t index) const {
size_t offset = 9;
for(size_t i=0; i<size(); ++i) {
int point_count = *(int*)&m_data[offset];
if(i == index)
return line((line::value_type*)&m_data[offset+4], point_count);
offset = offset + 4 + 16*point_count;
}
return line(NULL,(size_t)0);
}
wkb polygon(size_t index) const {
size_t offset = 9;
for(size_t i=0; i<this->size(); ++i) {
wkb o(&m_data[offset]);
if(i == index)
return o;
for(size_t j=0; j<o.size(); ++j)
offset += o.ring(j).size() * 16 + 4;
offset += 9;
}
return wkb(0);
}
ml::rect_d bounds() const {
ml::rect_d r(ml::rect_d::void_rect());
switch (type()) {
case wkb_polygon: {
for(size_t i=0; i < size(); ++i) {
line l = ring(i);
r.bounds(l.begin(),l.end());
}
} break;
case wkb_multi_polygon: {
for(size_t i=0; i < size(); ++i) {
r.extend(polygon(i).bounds());
}
} break;
case wkb_point: {
r.min.x = r.max.x = *(double*)&m_data[5];
r.min.y = r.max.y = *(double*)&m_data[13];
} break;
case wkb_line_string: {
line l = linestring();
r.bounds(l.begin(),l.end());
} break;
case wkb_multi_line_string: {
for(size_t i=0; i < size(); ++i) {
r.extend(linestring(i).bounds());
}
} break;
default:
break;
}
return r;
}
};
} // namespace ml
В принципе, все работало и давало верные результаты, но (куда же без этого «но») когда пришлось обрабатывать несколько миллионов объектов, время, затраченное на обработку, нас не устроило.
По результатам профилирования стало понятно, почему очень много времени тратилось на вычисление нужной части WKB. Создавалось много промежуточных объектов на стеке и была в наличии пусть небольшая, но рекурсия. Почесав в затылке, мы решили несколько облагородить код и избавиться от ненужной рекурсии, да и вообще от лишних действий. Для начала мы добавили код вычисления окончания текущего элемента и получения начала следующего элемента:
inline const char * end_of_ring(const char *ptr) const {
return ptr + 4 + (*(unsigned int *)(ptr) * 16);
}
inline const char * end_of_polygon(const char *ptr) const {
for (size_t sz = *(unsigned int*)((ptr+=9)-4);sz--;)
ptr += (*(unsigned int *)(ptr) * 16) + 4;
return ptr;
}
После этого были переписаны методы получения элементов, они стали проще и очевиднее:
line linestring() const {
return line((line::value_type*)(m_data+9), size());
}
ml::point_d const & point() const {
return *(ml::point_d*)(m_data+5);
}
wkb linestring(size_t index) const {
assert(index>=0 && index<m_size);
const char *ptr = m_data+9;
while (index--) {
ptr += 9 + (*(unsigned int*)(ptr+5) * 16);
}
return wkb(ptr);
}
wkb polygon(size_t index) const {
assert(index>=0 && index<m_size);
const char *ptr = m_data+9;
while (index--) {
ptr = end_of_polygon(ptr);
}
return wkb(ptr);
}
line ring(size_t index) const {
assert(index>=0 && index<m_size);
const char *ptr = m_data+9;
while (index--) {
ptr = end_of_ring(ptr);
}
return line((line::value_type*)(ptr+4),(line::value_type*)end_of_ring(ptr));
}
Уже это увеличило производительность. Но учитывая специфику обработки точек, можно было еще улучшить архитектуру и удобство, добавив следующий метод:
template< typename T >
T & apply( T & func) const {
switch (type()) {
case wkb_polygon: {
const char * ptr = m_data+9;
for(size_t i=0; i<m_size; ++i) {
ml::point_d *begin = (ml::point_d*)(ptr+4);
ml::point_d *end = begin + *(unsigned int *)(ptr);
ptr = (const char *)end;
func(begin,end);
}
} break;
case wkb_multi_polygon: {
const char * ptr = m_data+9;
for(size_t i=0; i<m_size; ++i) {
for (size_t sz = *(unsigned int*)((ptr+=9)-4);sz--;) {
ml::point_d *begin = (ml::point_d*)(ptr+4);
ml::point_d *end = begin + *(unsigned int *)(ptr);
ptr = (const char *)end;
func(begin,end);
}
}
} break;
case wkb_point: {
ml::point_d *begin = (ml::point_d*)(m_data+5);
ml::point_d *end = begin + 1;
func(begin,end);
} break;
case wkb_line_string: {
ml::point_d *begin = (ml::point_d*)(m_data+9);
ml::point_d *end = begin + size();
func(begin,end);
} break;
case wkb_multi_line_string: {
const char * ptr = m_data+9;
for(size_t i=0; i<m_size; ++i) {
ml::point_d *begin = (ml::point_d*)(ptr+9);
ml::point_d *end = begin + *(unsigned int *)(ptr+5);
ptr = (const char *)end;
func(begin,end);
}
} break;
default:
break;
}
return func;
}
мы дописали функтор для вычисления описывающего прямоугольника примерно следующим образом:
struct bounds_counter {
ml::rect_d r;
bounds_counter() : r(ml::rect_d::void_rect()) {}
void operator()(ml::point_d const *begin, ml::point_d const *end) {
r.bounds(begin,end);
}
ml::rect_d const & bounds() const {
return r;
}
};
метод bounds после переписывания стал выглядеть так:
ml::rect_d ml::wkb::bounds() const {
struct bounds_counter {
ml::rect_d r;
bounds_counter() : r(ml::rect_d::void_rect()) {}
void operator()(ml::point_d const *begin, ml::point_d const *end) {
r.bounds(begin,end);
}
ml::rect_d const & bounds() const {
return r;
}
};
bounds_counter b;
return apply(b).bounds();
}
Этот вариант не будет работать для g++, он более жестко следует стандартам:
14.3.1/2: A local type, a type with no linkage, an unnamed type or a type compounded from any of these types shall not be used as a template-argument for a template type-parameter.
но вполне работоспособен в clang и, судя по поиску в интернете, в MSVC++ (честно признаюсь, мы не проверяли). Для универсальности и корректности, определение функтора надо разместить в неименованном пространстве имен.
После всех доработок время выполнения задачи уменьшилось примерно в четыре раза. Также при использовании этого подхода было удобно написать вывод объектов в формате JSON:
std::string ml::wkb::to_geo_json() const {
struct point_formatter {
std::stringstream &ss;
unsigned int counter;
bool parts;
point_formatter(std::stringstream &s) : ss(s),counter(0), parts(false) {}
void operator() (ml::point_d const *begin, ml::point_d const *end) {
if (parts) ss << (counter ? ",[" : "[");
ss << "[" << begin->x << "," << begin->y << "]";
for(++begin;begin != end;++begin){
ss << ",[" << begin->x << "," << begin->y << "]";
}
if (parts) ss << "]";
++counter;
}
point_formatter & make_parts(bool p) {parts = p; counter = 0; return *this;}
};
std::stringstream ss;
point_formatter fmt(ss);
ss << std::fixed << std::showpoint << std::setprecision(6);
ss << "{";
switch(type()) {
case wkb_point: {
ss << ""type":"Point","coordinates":";
apply(fmt);
} break;
case wkb_line_string: {
ss << ""type":"LineString","coordinates": [";
apply(fmt);
ss << "]";
} break;
case wkb_multi_line_string: {
ss << ""type":"MultiLineString","coordinates": [";
apply(fmt.make_parts(true));
ss << "]";
} break;
case wkb_polygon: {
ss << ""type":"Polygon","coordinates": [";
apply(fmt.make_parts(true));
ss << "]";
} break;
case wkb_multi_polygon: {
ss << ""type":"MultiPolygon","coordinates": [";
for(size_t i=0; i < size(); ++i) {
ss << (i ? ",[" : "[");
polygon(i).apply(fmt.make_parts(true));
ss << "]";
}
ss << "]";
} break;
default: break;
}
ss << "}";
return ss.str();
}
В итоге получился достаточно удобный класс для работы с данными, представленными в формате WKB. Стало удобно добавлять новые операции над объектами. Достигнута приличная производительность.
Весь код, приведенный в статье, можно взять здесь и здесь.
Ершов Сергей,
программист проекта Карты@Mail.ru
Автор: darkserj