se tip

curl -> Rest Proxy -> kafka -> sink-jdbcconnector -> pgsql

joshua90 2020. 6. 29. 20:57

kafka 생태계에서 Rest Proxy 와 sink-jdbcconnector 를 이용하여

Rest Proxy API 를 요청하여 postgresql 테이블에 데이터를 추가하는 방법 (Schema Registry 사용)

1. sink-jdbc-connector properties 설정 (/home/vagrant/sink-postgresql.properties)

name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1

topics=orders

connection.url=jdbc:postgresql://localhost:5432/test
connection.user=postgres
connection.password=postgres

auto.create=true

2. jdbc connector 실행

sudo /usr/bin/connect-standalone /etc/schema-registry/connect-avro-standalone.properties /home/vagrant/sink-postgresql.properties

3. Rest Proxy 를 사용하여 schema 등록하기

curl -X POST -H "Content-Type: application/vnd.kafka.avro.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"value_schema": "{ \"type\":\"record\",\"name\":\"myrecord\",\"fields\": [ {\"name\": \"id\",\"type\": \"int\" }, {\"name\": \"product\", \"type\": \"string\" }, {\"name\": \"quantity\", \"type\": \"int\"}, {\"name\": \"price\", \"type\": \"float\" } ] }" , "records": [ {"value": {"id": 1, "product": "foo", "quantity": 100, "price": 50} } ] }' "http://localhost:8082/topics/orders"

4. Rest Proxy 를 사용하여 data 등록하기

curl -X POST -H "Content-Type: application/vnd.kafka.avro.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"value_schema_id": 1 , "records": [ {"value": {"id": 3, "product": "bar", "quantity": 10, "price": 500} } ] }' "http://localhost:8082/topics/orders"