# pulsar-order **Repository Path**: coderman_hero/pulsar-order ## Basic Information - **Project Name**: pulsar-order - **Description**: Apache Pulsar重试主题(延迟队列)自动关闭超时订单 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 2 - **Created**: 2023-07-04 - **Last Updated**: 2023-07-04 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Apache Pulsar重试(延迟)队列处理超时订单 ### 常见的解决方案 - RabbitMQ - Redis - Kafka TimeWheel(时间轮) - **Apache Pulsar** ### 超时订单处理思路 Pulsar是一个经常拿来和Kafka对比的一个消息队列和流处理平台。
**Pulsar原生支持重试(延迟)队列和死信队列,实现起来非常简单。**
自动关闭超时订单的问题,一般的处理思路如下图:
![image.png](img/order-process.png) ### 部署Pulsar 使用docker compose部署pulsar[1]和pulsar manager[2] ```yaml version: '3' networks: pulsar: services: # Start zookeeper zookeeper: image: apachepulsar/pulsar:3.0.0 container_name: zookeeper restart: on-failure networks: - pulsar volumes: - ./data/zookeeper:/pulsar/data/zookeeper environment: - metadataStoreUrl=zk:zookeeper:2181 - PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=256m command: > bash -c "bin/apply-config-from-env.py conf/zookeeper.conf && \ bin/generate-zookeeper-config.sh conf/zookeeper.conf && \ exec bin/pulsar zookeeper" healthcheck: test: [ "CMD", "bin/pulsar-zookeeper-ruok.sh" ] interval: 10s timeout: 5s retries: 30 # Init cluster metadata pulsar-init: container_name: pulsar-init hostname: pulsar-init image: apachepulsar/pulsar:3.0.0 networks: - pulsar command: > bin/pulsar initialize-cluster-metadata \ --cluster cluster-a \ --zookeeper zookeeper:2181 \ --configuration-store zookeeper:2181 \ --web-service-url http://broker:8080 \ --broker-service-url pulsar://broker:6650 depends_on: zookeeper: condition: service_healthy # Start bookie bookie: image: apachepulsar/pulsar:3.0.0 container_name: bookie restart: on-failure networks: - pulsar environment: - clusterName=cluster-a - zkServers=zookeeper:2181 - metadataServiceUri=metadata-store:zk:zookeeper:2181 # otherwise every time we run docker compose uo or down we fail to start due to Cookie # See: https://github.com/apache/bookkeeper/blob/405e72acf42bb1104296447ea8840d805094c787/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Cookie.java#L57-68 - advertisedAddress=bookie - BOOKIE_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m depends_on: zookeeper: condition: service_healthy pulsar-init: condition: service_completed_successfully # Map the local directory to the container to avoid bookie startup failure due to insufficient container disks. volumes: - ./data/bookkeeper:/pulsar/data/bookkeeper command: bash -c "bin/apply-config-from-env.py conf/bookkeeper.conf && exec bin/pulsar bookie" # Start broker broker: image: apachepulsar/pulsar:3.0.0 container_name: broker hostname: broker restart: on-failure networks: - pulsar environment: - metadataStoreUrl=zk:zookeeper:2181 - zookeeperServers=zookeeper:2181 - clusterName=cluster-a - managedLedgerDefaultEnsembleSize=1 - managedLedgerDefaultWriteQuorum=1 - managedLedgerDefaultAckQuorum=1 - advertisedAddress=broker - advertisedListeners=external:pulsar://127.0.0.1:6650 - PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m depends_on: zookeeper: condition: service_healthy bookie: condition: service_started ports: - "6650:6650" - "8080:8080" command: bash -c "bin/apply-config-from-env.py conf/broker.conf && exec bin/pulsar broker" pulsar-manager: image: apachepulsar/pulsar-manager:v0.3.0 container_name: pulsar-manager hostname: pulsar-manager restart: on-failure networks: - pulsar volumes: - ./application.properties:/pulsar-manager/pulsar-manager/application.properties environment: - SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties ports: - "9527:9527" - "7750:7750" command: bash -c "pulsar-manager/bin/pulsar-manager --pulsar.peek.message=true" healthcheck: test: [ "CMD", "curl", "-f", "http://localhost:7750" ] interval: 10s timeout: 5s retries: 30 pulsar-manager-init: container_name: pulsar-manager-init hostname: pulsar-manager-init image: apachepulsar/pulsar:3.0.0 networks: - pulsar command: - bash - -c - | set -ex CSRF_TOKEN=$(curl http://pulsar-manager:7750/pulsar-manager/csrf-token) curl \ -H 'X-XSRF-TOKEN: $${CSRF_TOKEN}' \ -H 'Cookie: XSRF-TOKEN=$${CSRF_TOKEN};' \ -H "Content-Type: application/json" \ -X PUT http://pulsar-manager:7750/pulsar-manager/users/superuser \ -d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}' depends_on: pulsar-manager: condition: service_healthy ``` ```properties spring.cloud.refresh.refreshable=none server.port=7750 # configuration log logging.path= logging.file=pulsar-manager.log # DEBUG print execute sql logging.level.org.apache=INFO mybatis.type-aliases-package=org.apache.pulsar.manager # database connection # SQLLite #spring.datasource.driver-class-name=org.sqlite.JDBC #spring.datasource.url=jdbc:sqlite:pulsar_manager.db #spring.datasource.initialization-mode=always #spring.datasource.schema=classpath:/META-INF/sql/sqlite-schema.sql #spring.datasource.username= #spring.datasource.password= #HerdDB JDBC Driver spring.datasource.driver-class-name=herddb.jdbc.Driver # HerdDB - local in memory-only #spring.datasource.url=jdbc:herddb:local # HerdDB - start embedded server, data persisted on local disk (directory 'dbdata'), listening on localhost:7000 spring.datasource.url=jdbc:herddb:server:localhost:7000?server.start=true&server.base.dir=dbdata # HerdDB - start embedded server 'diskless-cluster' mode, WAL and Data persisted on Bookies, Metadata on ZooKeeper in '/herd', listening on localhost:7000 #spring.datasource.url=jdbc:herddb:zookeeper:localhost:2181?server.start=true&server.base.dir=dbdata&server.mode=diskless-cluster&server.node.id=localhost # HerdDB - connect to standalone server at localhost:7000 #spring.datasource.url=jdbc:herddb:server:localhost:7000 # HerdDB - connect to cluster, uses ZooKeeper for service discovery #spring.datasource.url=jdbc:herddb:zookeeper:localhost:2181/herd spring.datasource.schema=classpath:/META-INF/sql/herddb-schema.sql spring.datasource.username=sa spring.datasource.password=hdb spring.datasource.initialization-mode=always # postgresql configuration #spring.datasource.driver-class-name=org.postgresql.Driver #spring.datasource.url=jdbc:postgresql://127.0.0.1:5432/pulsar_manager #spring.datasource.username=postgres #spring.datasource.password=postgres # zuul config # https://cloud.spring.io/spring-cloud-static/Dalston.SR5/multi/multi__router_and_filter_zuul.html # By Default Zuul adds Authorization to be dropped headers list. Below we are manually setting it zuul.sensitive-headers=Cookie,Set-Cookie zuul.routes.admin.path=/admin/** zuul.routes.admin.url=http://localhost:8080/admin/ zuul.routes.lookup.path=/lookup/** zuul.routes.lookup.url=http://localhost:8080/lookup/ # pagehelper plugin #pagehelper.helperDialect=sqlite # force 'mysql' for HerdDB, comment out for postgresql pagehelper.helperDialect=mysql backend.directRequestBroker=true backend.directRequestHost=http://localhost:8080 backend.jwt.token= backend.broker.pulsarAdmin.authPlugin= backend.broker.pulsarAdmin.authParams= backend.broker.pulsarAdmin.tlsAllowInsecureConnection=false backend.broker.pulsarAdmin.tlsTrustCertsFilePath= backend.broker.pulsarAdmin.tlsEnableHostnameVerification=false jwt.secret=dab1c8ba-b01b-11e9-b384-186590e06885 jwt.sessionTime=2592000 # If user.management.enable is true, the following account and password will no longer be valid. pulsar-manager.account=pulsar pulsar-manager.password=pulsar # If true, the database is used for user management user.management.enable=true # Optional -> SECRET, PRIVATE, default -> PRIVATE, empty -> disable auth # SECRET mode -> bin/pulsar tokens create --secret-key file:///path/to/my-secret.key --subject test-user # PRIVATE mode -> bin/pulsar tokens create --private-key file:///path/to/my-private.key --subject test-user # Detail information: http://pulsar.apache.org/docs/en/security-token-admin/ jwt.broker.token.mode= jwt.broker.secret.key=file:///path/broker-secret.key jwt.broker.public.key=file:///path/pulsar/broker-public.key jwt.broker.private.key=file:///path/broker-private.key # bookie bookie.host=http://localhost:8050 bookie.enable=false redirect.scheme=http redirect.host=localhost redirect.port=9527 # Stats interval # millisecond insert.stats.interval=30000 # millisecond clear.stats.interval=300000 init.delay.interval=0 # cluster data reload cluster.cache.reload.interval.ms=60000 user.access.token.expire=604800 # thymeleaf configuration for third login. spring.thymeleaf.cache=false spring.thymeleaf.prefix=classpath:/templates/ spring.thymeleaf.check-template-location=true spring.thymeleaf.suffix=.html spring.thymeleaf.encoding=UTF-8 spring.thymeleaf.servlet.content-type=text/html spring.thymeleaf.mode=HTML5 # default environment configuration default.environment.name=pulsar default.environment.service_url=http://broker:8080 default.environment.bookie_url=pulsar://broker:6650 # enable tls encryption # keytool -import -alias test-keystore -keystore ca-certs -file certs/ca.cert.pem tls.enabled=false tls.keystore=keystore-file-path tls.keystore.password=keystore-password tls.hostname.verifier=false tls.pulsar.admin.ca-certs=ca-client-path # support peek message, default false pulsar.peek.message=true # swagger configuration swagger.enabled=true # casdoor configuration casdoor.endpoint = http://localhost:8000 casdoor.clientId = 6ba06c1e1a30929fdda7 casdoor.clientSecret = df92bbf913225ebbae9af7ba8d41fe19507eb079 casdoor.certificate=\ -----BEGIN CERTIFICATE-----\n\ MIIE+TCCAuGgAwIBAgIDAeJAMA0GCSqGSIb3DQEBCwUAMDYxHTAbBgNVBAoTFENh\n\ c2Rvb3IgT3JnYW5pemF0aW9uMRUwEwYDVQQDEwxDYXNkb29yIENlcnQwHhcNMjEx\n\ MDE1MDgxMTUyWhcNNDExMDE1MDgxMTUyWjA2MR0wGwYDVQQKExRDYXNkb29yIE9y\n\ Z2FuaXphdGlvbjEVMBMGA1UEAxMMQ2FzZG9vciBDZXJ0MIICIjANBgkqhkiG9w0B\n\ AQEFAAOCAg8AMIICCgKCAgEAsInpb5E1/ym0f1RfSDSSE8IR7y+lw+RJjI74e5ej\n\ rq4b8zMYk7HeHCyZr/hmNEwEVXnhXu1P0mBeQ5ypp/QGo8vgEmjAETNmzkI1NjOQ\n\ CjCYwUrasO/f/MnI1C0j13vx6mV1kHZjSrKsMhYY1vaxTEP3+VB8Hjg3MHFWrb07\n\ uvFMCJe5W8+0rKErZCKTR8+9VB3janeBz//zQePFVh79bFZate/hLirPK0Go9P1g\n\ OvwIoC1A3sarHTP4Qm/LQRt0rHqZFybdySpyWAQvhNaDFE7mTstRSBb/wUjNCUBD\n\ PTSLVjC04WllSf6Nkfx0Z7KvmbPstSj+btvcqsvRAGtvdsB9h62Kptjs1Yn7GAuo\n\ I3qt/4zoKbiURYxkQJXIvwCQsEftUuk5ew5zuPSlDRLoLByQTLbx0JqLAFNfW3g/\n\ pzSDjgd/60d6HTmvbZni4SmjdyFhXCDb1Kn7N+xTojnfaNkwep2REV+RMc0fx4Gu\n\ hRsnLsmkmUDeyIZ9aBL9oj11YEQfM2JZEq+RVtUx+wB4y8K/tD1bcY+IfnG5rBpw\n\ IDpS262boq4SRSvb3Z7bB0w4ZxvOfJ/1VLoRftjPbLIf0bhfr/AeZMHpIKOXvfz4\n\ yE+hqzi68wdF0VR9xYc/RbSAf7323OsjYnjjEgInUtRohnRgCpjIk/Mt2Kt84Kb0\n\ wn8CAwEAAaMQMA4wDAYDVR0TAQH/BAIwADANBgkqhkiG9w0BAQsFAAOCAgEAn2lf\n\ DKkLX+F1vKRO/5gJ+Plr8P5NKuQkmwH97b8CS2gS1phDyNgIc4/LSdzuf4Awe6ve\n\ C06lVdWSIis8UPUPdjmT2uMPSNjwLxG3QsrimMURNwFlLTfRem/heJe0Zgur9J1M\n\ 8haawdSdJjH2RgmFoDeE2r8NVRfhbR8KnCO1ddTJKuS1N0/irHz21W4jt4rxzCvl\n\ 2nR42Fybap3O/g2JXMhNNROwZmNjgpsF7XVENCSuFO1jTywLaqjuXCg54IL7XVLG\n\ omKNNNcc8h1FCeKj/nnbGMhodnFWKDTsJcbNmcOPNHo6ixzqMy/Hqc+mWYv7maAG\n\ Jtevs3qgMZ8F9Qzr3HpUc6R3ZYYWDY/xxPisuKftOPZgtH979XC4mdf0WPnOBLqL\n\ 2DJ1zaBmjiGJolvb7XNVKcUfDXYw85ZTZQ5b9clI4e+6bmyWqQItlwt+Ati/uFEV\n\ XzCj70B4lALX6xau1kLEpV9O1GERizYRz5P9NJNA7KoO5AVMp9w0DQTkt+LbXnZE\n\ HHnWKy8xHQKZF9sR7YBPGLs/Ac6tviv5Ua15OgJ/8dLRZ/veyFfGo2yZsI+hKVU5\n\ nCCJHBcAyFnm1hdvdwEdH33jDBjNB6ciotJZrf/3VYaIWSalADosHAgMWfXuWP+h\n\ 8XKXmzlxuHbTMQYtZPDgspS5aK+S4Q9wb8RRAYo=\n\ -----END CERTIFICATE-----\n\ casdoor.organizationName = pulsar casdoor.applicationName = app-pulsar ``` ### 生产者 在pom.xml文件中添加依赖[3],然后创建生产者发送消息[4]。 ```java package onehour.pulsar.examples; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; public class OrderProducer { public static void main(String[] args) throws Exception { PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build(); Producer producer = client.newProducer(Schema.STRING) .topic("my-order") .create(); for (int i = 10000; i < 10020; i++) { System.out.println("send order id :" + i); producer.send(String.valueOf(i)); } producer.close(); client.close(); } } ``` ### 重试(延迟)队列和死信队列 ![image.png](img/retry-letter-topic.png)
消费者会自动订阅原始主题关联的重试队列(延迟队列),消费者通过调用`reconsumeLater`方法将消息放入重试队列[5]中,并设置多长时间后重试,超过最大充数次数,未成功消费的消息会被写入死信队列[6]。 ```java Consumer consumer = client.newConsumer(Schema.STRING) .topic("my-order") .subscriptionName("my-subscription") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionType(SubscriptionType.Shared) .enableRetry(true) // 开启重试(延时)队列 .deadLetterPolicy(DeadLetterPolicy.builder() .retryLetterTopic("retry-order") // 重试队列名称 .maxRedeliverCount(1) // 超过最大重试次数,将消息放入死信队列 .deadLetterTopic("dead-order") //死信队列名称 .initialSubscriptionName("dead-order-subscription")//订阅超时订单 .build()) .subscribe(); ``` 设置重试间隔时间(延迟时间),这里为了演示,设置为60秒。 ```java consumer.reconsumeLater(msg, 60, TimeUnit.SECONDS); ``` ### 消费者 ```java package onehour.pulsar.examples; import org.apache.pulsar.client.api.*; import java.util.Random; import java.util.concurrent.TimeUnit; public class OrderConsumer { public static void main(String[] args) throws Exception { PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build(); Consumer consumer = client.newConsumer(Schema.STRING) .topic("my-order") .subscriptionName("my-subscription") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscriptionType(SubscriptionType.Shared) .enableRetry(true) // 开启重试(延时)队列 .deadLetterPolicy(DeadLetterPolicy.builder() .retryLetterTopic("retry-order") // 重试队列名称 .maxRedeliverCount(2) // 超过最大重试次数,将消息放入死信队列 .deadLetterTopic("dead-order") //死信队列名称 .initialSubscriptionName("dead-order-subscription")//处理超时订单 .build()) .subscribe(); while (true) { Message msg = consumer.receive(); String orderId = (String) msg.getValue(); boolean isPaid = checkOrderStatus(orderId); if (isPaid) { System.out.println("order " + orderId + " is paid"); // 订单已支付,发送确认通知, 消息被成功消费 consumer.acknowledge(msg); } else { // 订单未支付,移入重试队列 System.out.println("order " + orderId + " is not paid"); consumer.reconsumeLater(msg, 60, TimeUnit.SECONDS); // 消费者会自动订阅重试队列,60s后再次检查重试队列里的订单,若仍未支付,则消息被自动移入死信队列 } } } private static boolean checkOrderStatus(String orderId) { // read from database return new Random().nextBoolean(); } } ``` ### 管理界面 浏览器打开 [http://localhost:9527](http://localhost:9527), 查看主题中的消息
用户名:`admin`, 密码:`apachepulsar`
![image.png](img/pulsar-manager.png) ### 自动关闭订单 ```java package onehour.pulsar.examples; import org.apache.pulsar.client.api.*; public class OrderCloseConsumer { public static void main(String[] args) throws Exception { PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build(); Consumer consumer = client.newConsumer(Schema.STRING) .topic("dead-order") .subscriptionName("dead-order-subscription") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); while (true) { Message msg = consumer.receive(); String orderId = (String) msg.getValue(); System.out.println("close order " + orderId); } } } ``` 参考文档:
[1] [https://pulsar.apache.org/docs/3.0.x/getting-started-docker-compose/](https://pulsar.apache.org/docs/3.0.x/getting-started-docker-compose/)
[2] [https://pulsar.apache.org/docs/3.0.x/administration-pulsar-manager/#configuration](https://pulsar.apache.org/docs/3.0.x/administration-pulsar-manager/#configuration)
[3] [https://pulsar.apache.org/docs/3.0.x/client-libraries-java-setup/](https://pulsar.apache.org/docs/3.0.x/client-libraries-java-setup/)
[4] [https://pulsar.apache.org/docs/3.0.x/client-libraries-java-use/](https://pulsar.apache.org/docs/3.0.x/client-libraries-java-use/)
[5] [https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#retry-letter-topic](https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#retry-letter-topic)
[6] [https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#dead-letter-topic](https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#dead-letter-topic)