Bläddra i källkod

rabbitmq topic 추가

tobby48 5 år sedan
förälder
incheckning
b9bc6d1856

+ 39
- 0
src/main/java/kr/co/swh/lecture/opensource/rabbitmq/EmitLogTopic.java Visa fil

@@ -0,0 +1,39 @@
1
+package kr.co.swh.lecture.opensource.rabbitmq; 
2
+
3
+import com.rabbitmq.client.Channel;
4
+import com.rabbitmq.client.Connection;
5
+import com.rabbitmq.client.ConnectionFactory;
6
+
7
+/**
8
+ * <pre>
9
+ * kr.co.swh.lecture.opensource.rabbitmq 
10
+ * EmitLogTopic.java
11
+ *
12
+ * 설명 :
13
+ * </pre>
14
+ * 
15
+ * @since : 2020. 6. 21.
16
+ * @author : tobby48
17
+ * @version : v1.0
18
+ */
19
+public class EmitLogTopic {
20
+
21
+  private static final String EXCHANGE_NAME = "news.new";
22
+
23
+  public static void main(String[] argv) throws Exception {
24
+    ConnectionFactory factory = new ConnectionFactory();
25
+    factory.setHost("dev-swh.ga");
26
+    try (Connection connection = factory.newConnection();
27
+         Channel channel = connection.createChannel()) {
28
+
29
+        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
30
+
31
+        String routingKey = "naver";
32
+        String message = "뉴스다";
33
+
34
+        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
35
+        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
36
+    }
37
+  }
38
+  //..
39
+}

+ 53
- 0
src/main/java/kr/co/swh/lecture/opensource/rabbitmq/ReceiveLogsTopic.java Visa fil

@@ -0,0 +1,53 @@
1
+package kr.co.swh.lecture.opensource.rabbitmq; 
2
+
3
+import com.rabbitmq.client.*;
4
+import java.io.IOException;
5
+
6
+/**
7
+ * <pre>
8
+ * kr.co.swh.lecture.opensource.rabbitmq 
9
+ * ReceiveLogsTopic.java
10
+ *
11
+ * 설명 :
12
+ * </pre>
13
+ * 
14
+ * @since : 2020. 6. 21.
15
+ * @author : tobby48
16
+ * @version : v1.0
17
+ */
18
+public class ReceiveLogsTopic {
19
+	
20
+	private static final String EXCHANGE_NAME = "news.new";
21
+	
22
+	// argv -> "kern.*" // "*.critical" // "kern.*" "*.critical" // "kern.critical" "A critical kernel error"
23
+	
24
+	public static void main(String[] argv) throws Exception {
25
+		ConnectionFactory factory = new ConnectionFactory();
26
+		factory.setHost("dev-swh.ga");
27
+		Connection connection = factory.newConnection();
28
+		Channel channel = connection.createChannel();
29
+		
30
+		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
31
+		String queueName = channel.queueDeclare().getQueue();
32
+		
33
+		String[] bindingKeys = {"naver", "daum"};
34
+		
35
+		for (String bindingKey : bindingKeys) {
36
+			channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
37
+		}
38
+		
39
+		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
40
+		
41
+		Consumer consumer = new DefaultConsumer(channel) {
42
+			@Override
43
+			public void handleDelivery(String consumerTag, Envelope envelope,
44
+				AMQP.BasicProperties properties, byte[] body) throws IOException {
45
+				String message = new String(body, "UTF-8");
46
+				System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
47
+			}
48
+		};
49
+ 
50
+		channel.basicConsume(queueName, true, consumer);
51
+	
52
+	}
53
+}