Building your blockchain, deploying your executables, launching a token and swapping with other blockchains, and a first pass at a DSL syntax

Visit the Current Working Branch on Github and pull to run the following examples, they can also be deployed onto local clusters by running docker-compose up in the main directory. This is currently an aplha and our beta release will include a refactor that repackages the cell-api functionality as a dependency which can be loaded into any jvm project. For now, development in the examples directory mimicks the package structure the beta will have and any code there can be copied pasted over into your own repository fairly simply.

Cell Monad

At the core of everything is the Cell monad, within which lives consensus. The first step to building your blockchain is to implement a consensus algorithm within a Cell monad, or add logic to an existing cell, which can be done using the Cell Monoid below.

There are two main parts to a Cell, a CoAlgebra which builds up a computation structure and an Algebra which tears it down and returns the result. Both are partial functions, pattern matching on different types which represent the stages in your consensus. All stage types must inherit Ω like L0CellInput. With these constraints, we can pass the Co/Algebra into a recursion scheme, which compile time checks the stages of your consensus and throws an error if its possible that it would fail in an unhandled way.


sealed trait L0CellInput extends Ω

object L0CellInput extends Ω {
  case class HandleDAGL1(data: Signed[DAGBlock]) extends L0CellInput
  case class HandleStateChannelSnapshot(snapshot: StateChannelOutput) extends L0CellInput
  case class Empty() extends L0CellInput
}

class L0Cell[F[_]: Async](
  data: L0CellInput,
  l1OutputQueue: Queue[F, Signed[DAGBlock]],
  stateChannelOutputQueue: Queue[F, StateChannelOutput]
) extends Cell[F, StackF, CoalgebraCommand, L0CellInput, Either[CellError, Ω]](
      data,
      scheme.hyloM(
        AlgebraM[F, StackF, Either[CellError, Ω]] {
          case More(a) => a.pure[F]
          case Done(Right(cmd: AlgebraCommand)) =>
            cmd match {
              case EnqueueStateChannelSnapshot(snapshot) =>
                Algebra.enqueueStateChannelSnapshot(stateChannelOutputQueue)(snapshot)
              case EnqueueDAGL1Data(data) =>
                Algebra.enqueueDAGL1Data(l1OutputQueue)(data)
              case _ =>
                NullTerminal.asRight[CellError].widen[Ω].pure[F]
            }
          case Done(other) => other.pure[F]
        },
        CoalgebraM[F, StackF, Ω] {
          case ProcessDAGL1(data)                    => Coalgebra.processDAGL1(data)
          case ProcessStateChannelSnapshot(snapshot) => Coalgebra.processStateChannelSnapshot(snapshot)
          case _                                     => Coalgebra.empty()
        }
      ),
      {
        case HandleDAGL1(data)                    => ProcessDAGL1(data)
        case HandleStateChannelSnapshot(snapshot) => ProcessStateChannelSnapshot(snapshot)
        case _                                    => Empty()
      }
    )

object L0Cell {
  type Mk[F[_]] = L0CellInput => L0Cell[F]

  def mkCell[F[_]: Async](
    l1OutputQueue: Queue[F, Signed[DAGBlock]],
    stateChannelOutputQueue: Queue[F, StateChannelOutput]
  ): Ω => Cell[F, StackF, Ω, Ω, Either[CellError, Ω]] = data =>
    new Cell[F, StackF, Ω, Ω, Either[CellError, Ω]](
      data,
      scheme.hyloM(
        AlgebraM[F, StackF, Either[CellError, Ω]] {
          case More(a) => a.pure[F]
          case Done(Right(cmd: AlgebraCommand)) =>
            cmd match {
              case EnqueueStateChannelSnapshot(snapshot) =>
                Algebra.enqueueStateChannelSnapshot(stateChannelOutputQueue)(snapshot)
              case EnqueueDAGL1Data(data) =>
                Algebra.enqueueDAGL1Data(l1OutputQueue)(data)
              case _ =>
                NullTerminal.asRight[CellError].widen[Ω].pure[F]
            }
          case Done(other) => other.pure[F]
        },
        CoalgebraM[F, StackF, Ω] {
          case ProcessDAGL1(data)                    => Coalgebra.processDAGL1(data)
          case ProcessStateChannelSnapshot(snapshot) => Coalgebra.processStateChannelSnapshot(snapshot)
          case _                                     => Coalgebra.empty()
        }
      ),
      {
        case HandleDAGL1(data)                    => ProcessDAGL1(data)
        case HandleStateChannelSnapshot(snapshot) => ProcessStateChannelSnapshot(snapshot)
        case _                                    => Empty()
      }
    )

  def mkL0Cell[F[_]: Async](
    l1OutputQueue: Queue[F, Signed[DAGBlock]],
    stateChannelOutputQueue: Queue[F, StateChannelOutput]
  ): Mk[F] =
    data => new L0Cell(data, l1OutputQueue, stateChannelOutputQueue)

  type AlgebraR[F[_]] = F[Either[CellError, Ω]]
  type CoalgebraR[F[_]] = F[StackF[Ω]] // F[StackF[CoalgebraCommand]]

  object Algebra {

    def enqueueStateChannelSnapshot[F[_]: Async](
      queue: Queue[F, StateChannelOutput]
    )(snapshot: StateChannelOutput): AlgebraR[F] =
      queue.offer(snapshot) >>
        NullTerminal.asRight[CellError].widen[Ω].pure[F]

    def enqueueDAGL1Data[F[_]: Async](queue: Queue[F, Signed[DAGBlock]])(data: Signed[DAGBlock]): AlgebraR[F] =
      queue.offer(data) >>
        NullTerminal.asRight[CellError].widen[Ω].pure[F]
  }

  object Coalgebra {

    def processDAGL1[F[_]: Async](data: Signed[DAGBlock]): CoalgebraR[F] = {
      def res: StackF[Ω] = Done(AlgebraCommand.EnqueueDAGL1Data(data).asRight[CellError])

      res.pure[F]
    }

    def processStateChannelSnapshot[F[_]: Async](snapshot: StateChannelOutput): CoalgebraR[F] = {
      def res: StackF[Ω] = Done(AlgebraCommand.EnqueueStateChannelSnapshot(snapshot).asRight[CellError])

      res.pure[F]
    }

    def empty[F[_]: Async](): CoalgebraR[F] = {
      def res: StackF[Ω] = Done(NoAction.asRight[CellError])

      res.pure[F]
    }
  }
}

State Channel Example

State channels are basically just an fs2 api for Cells. You can have cells created within endpoints (like how above are invoked) or within a StateChannel class, within which you can build a streaming pipeline, piping data through Cells according to your own network topology. The best example is in org.reality.dag.l1.L1.scala, which implements one of the two Reality Cells. Also there is EthStateChannel.scala that has an example of how to build a bridge from another L1 to your blockchain.


class L1[F[_]: Async: SecurityProvider: Random](
  override val appConfig: AppConfig,
  override val blockAcceptanceS: Semaphore[F],
  override val blockCreationS: Semaphore[F],
  override val blockStoringS: Semaphore[F],
  override val keyPair: KeyPair,
  p2pClient: L1P2PClient[F],
  override val programs: L1Programs[F],
  override val queues: L1Queues[F],
  override val selfId: PeerId,
  override val services: L1Services[F],
  override val storages: L1Storages[F],
  override val validators: Validators[F],
  override val cellObj: StateChannelCell
) extends StateChannel[F, BlockConsensusInput, FinalBlock](
      appConfig: AppConfig,
      blockAcceptanceS: Semaphore[F],
      blockCreationS: Semaphore[F],
      blockStoringS: Semaphore[F],
      keyPair: KeyPair,
      p2pClient: L1P2PClient[F],
      programs: L1Programs[F],
      queues: L1Queues[F],
      selfId: PeerId,
      services: L1Services[F],
      storages: L1Storages[F],
      validators: Validators[F],
      cellObj: StateChannelCell
    ) {
  println("creating L1 ")
  implicit val logger = Slf4jLogger.getLogger[F]

//  val oneOffExe = new obj {}
  implicit val blockConsensusContext: BlockConsensusContext[F] =
    BlockConsensusContext[F](
      p2PClient.blockConsensus,
      storages.block,
      validators.block,
      storages.cluster,
      org.reality.dag.l1.domain.consensus.block.config.ConsensusConfig(), // todo set config here,
      storages.consensus,
      keyPair,
      selfId,
      storages.transaction,
      validators.transaction
    )

//  val myExecutable_ => Omega = new WasmSandbox{}
  override val cell: Ω => Cell[F, StackF, Ω, Ω, Either[CellError, Ω]] = cellObj.mkCell(blockConsensusContext)

  private val inspectionTriggerInput: Stream[F, OwnerBlockConsensusInput] = Stream
    .awakeEvery(5.seconds)
    .as(InspectionTrigger)

  private val ownRoundTriggerInput: Stream[F, OwnerBlockConsensusInput] = Stream
    .awakeEvery(5.seconds)
    .evalMapLocked(blockCreationS) { _ =>
      canStartOwnConsensus(
        storages.consensus,
        storages.node,
        storages.cluster,
        storages.block,
        storages.transaction,
        org.reality.dag.l1.domain.consensus.block.config.ConsensusConfig().peersCount,
        org.reality.dag.l1.domain.consensus.block.config.ConsensusConfig().tipsCount
      ).handleErrorWith { e =>
        logger.warn(e)("Failure checking if own consensus can be kicked off!").map(_ => false)
      }
    }
    .filter(identity)
    .as(OwnRoundTrigger)

  val l0PeerDiscovery: Stream[F, Unit] = Stream
    .awakeEvery(10.seconds)
    .evalMap { _ =>
      storages.lastGlobalSnapshotStorage.get.flatMap {
        _.fold(Applicative[F].unit) { latestSnapshot =>
          programs.l0PeerDiscovery.discover(latestSnapshot.signed.proofs.map(_.id).map(PeerId._Id.reverseGet))
        }
      }
    }

  private val ownerBlockConsensusInputs: Stream[F, OwnerBlockConsensusInput] =
    inspectionTriggerInput.merge(ownRoundTriggerInput)

  private val peerBlockConsensusInputs: Stream[F, PeerBlockConsensusInput] = Stream
    .fromQueueUnterminated(queues.peerBlockConsensusInput)
    .evalFilter(isPeerInputValid(_))
    .map(_.value)

  var blockConsensusInputs: Stream[F, BlockConsensusInput] =
    ownerBlockConsensusInputs.merge(peerBlockConsensusInputs)

  val runConsensus: Pipe[F, BlockConsensusInput, FinalBlock] =
    _.evalTap(input => logger.debug(s"Received block consensus input to process: ${input.show}"))
      .evalMap(
        //        new BlockConsensusCell(_, blockConsensusContext) // cell(_, blockConsensusContext)
        cell(_)
          .run()
          .handleErrorWith(e => CellError(e.getMessage).asLeft[Ω].pure[F])
      )
      .flatMap {
        case Left(ce) =>
          Stream.eval(logger.warn(ce)(s"Error occurred during some step of block consensus.")) >>
            Stream.empty
        case Right(ohm) =>
          ohm match {
            case fb @ FinalBlock(hashedBlock) =>
              Stream
                .eval(logger.debug(s"FinalBlock created! Hash=${hashedBlock.hash} ProofsHash=${hashedBlock.proofsHash} FinalBlock=${fb}"))
                .as(fb)
            case CleanedConsensuses(ids) =>
              Stream.eval(logger.warn(s"Cleaned following timed-out consensuses: $ids")) >>
                Stream.empty
            case NullTerminal => Stream.empty
            case other =>
              Stream.eval(logger.warn(s"Unexpected ohm in block consensus occurred: $other")) >>
                Stream.empty
          }
      }

  val gossipBlock: Pipe[F, FinalBlock, FinalBlock] =
    _.evalTap { fb =>
      services.gossip
        .spreadCommon(fb.hashedBlock.signed)
        .handleErrorWith(e => logger.warn(e)("Block gossip spread failed!"))
    }

  val peerBlocks: Stream[F, FinalBlock] = Stream
    .fromQueueUnterminated(queues.peerBlock)
    .evalMap(_.toHashedWithSignatureCheck)
    .evalTap {
      case Left(e)  => logger.warn(e)(s"Received an invalidly signed peer block!")
      case Right(_) => Async[F].unit
    }
    .collect {
      case Right(hashedBlock) => FinalBlock(hashedBlock)
    }

  val storeBlock: Pipe[F, FinalBlock, Unit] =
    _.evalMapLocked(blockStoringS) { fb =>
      storages.lastGlobalSnapshotStorage.getHeight.map(_.getOrElse(Height.MinValue)).flatMap { lastSnapshotHeight =>
        if (lastSnapshotHeight < fb.hashedBlock.height)
          storages.block.store(fb.hashedBlock).handleErrorWith(e => logger.debug(e)("Block storing failed."))
        else
          logger.debug(
            s"Block can't be stored! Block height not above last snapshot height! block:${fb.hashedBlock.height} <= snapshot: $lastSnapshotHeight"
          )
      }
    }

  val sendBlockToL0: Pipe[F, FinalBlock, FinalBlock] =
    _.evalTap { fb =>
      for {
        l0PeerOpt <- storages.l0Cluster.getPeers
          .map(_.toNonEmptyList.toList)
          .flatMap(_.filterA(p => services.collateral.hasCollateral(p.id)))
          .flatMap(peers => Random[F].shuffleList(peers))
          .map(peers => peers.headOption)

        _ <- l0PeerOpt.fold(logger.warn("No available L0 peer")) { l0Peer =>
          p2PClient.l0DAGCluster
            .sendL1Output(fb.hashedBlock.signed)(l0Peer)
            .ifM(Applicative[F].unit, logger.warn("Sending block to L0 failed."))
        }
      } yield ()
    }

  val blockAcceptance: Stream[F, Unit] = Stream
    .awakeEvery(1.seconds)
    .evalMapLocked(blockAcceptanceS) { _ =>
      storages.block.getWaiting.flatTap { awaiting =>
        if (awaiting.nonEmpty) logger.debug(s"Pulled following blocks for acceptance ${awaiting.keySet}")
        else Async[F].unit
      }.flatMap(
        _.toList
          .sortBy(_._2.value.height)
          .traverse {
            case (hash, signedBlock) =>
              logger.debug(s"Acceptance of a block $hash starts!") >>
                services.block
                  .accept(signedBlock)
                  .handleErrorWith(logger.warn(_)(s"Failed acceptance of a block with ${hash.show}"))
          }
          .void
      )
    }

  val globalSnapshotProcessing: Stream[F, Unit] = Stream
    .awakeEvery(10.seconds)
    .evalMap(_ => services.l0.pullGlobalSnapshots)
    .evalTap { snapshots: Either[(Hashed[GlobalSnapshot], GlobalSnapshotInfo), List[Hashed[GlobalSnapshot]]] =>
      def log(snapshot: Hashed[GlobalSnapshot]) =
        logger.info(s"Pulled following global snapshot: ${GlobalSnapshotReference.fromHashedGlobalSnapshot(snapshot).show}")

      snapshots match {
        case Left((snapshot, _)) => log(snapshot)
        case Right(snapshots)    => snapshots.traverse(log).void
      }
    }
    .evalMapLocked(NonEmptyList.of(blockAcceptanceS, blockCreationS, blockStoringS)) {
      case Left((snapshot, state)) =>
        programs.snapshotProcessor.process((snapshot, state).asLeft[Hashed[GlobalSnapshot]]).map(List(_))
      case Right(snapshots) =>
        (snapshots, List.empty[SnapshotProcessingResult]).tailRecM {
          case (snapshot :: nextSnapshots, aggResults) =>
            programs.snapshotProcessor
              .process(snapshot.asRight[(Hashed[GlobalSnapshot], GlobalSnapshotInfo)])
              .map(result => (nextSnapshots, aggResults :+ result).asLeft[List[SnapshotProcessingResult]])

          case (Nil, aggResults) =>
            aggResults.asRight[(List[Hashed[GlobalSnapshot]], List[SnapshotProcessingResult])].pure[F]
        }
    }
    .evalMap {
      _.traverse(result => logger.info(s"Snapshot processing result: ${result.show}")).void
    }

  val blockConsensus: Stream[F, Unit] =
    blockConsensusInputs
      .through(runConsensus)
      .through(gossipBlock)
      .through(sendBlockToL0)
      .merge(peerBlocks)
      .through(storeBlock)

  val runtime: Stream[F, Unit] =
    blockConsensus
      .merge(blockAcceptance)
      .merge(globalSnapshotProcessing)
      .merge(l0PeerDiscovery)
}

abstract class StateChannel[F[_]: Async: SecurityProvider: Random, A, B](
  val appConfig: AppConfig,
  val blockAcceptanceS: Semaphore[F],
  val blockCreationS: Semaphore[F],
  val blockStoringS: Semaphore[F],
  val keyPair: KeyPair,
  val p2PClient: L1P2PClient[F],
  val programs: L1Programs[F],
  val queues: L1Queues[F],
  val selfId: PeerId,
  val services: L1Services[F],
  val storages: L1Storages[F],
  val validators: Validators[F],
  val cellObj: StateChannelCell
) {
  val cell: Ω => Cell[F, StackF, Ω, Ω, Either[CellError, Ω]]
  val runConsensus: Pipe[F, A, B]
  val runtime: Stream[F, Unit]
  implicit val blockConsensusContext: BlockConsensusContext[F]
}

object L1 extends MkStateChannel {
  val cellObj = BlockConsensusCell
  def make[F[_]: Async: SecurityProvider: Random](
    appConfig: AppConfig,
    keyPair: KeyPair,
    p2pClient: L1P2PClient[F],
    programs: L1Programs[F],
    queues: L1Queues[F],
    selfId: PeerId,
    services: L1Services[F],
    storages: L1Storages[F],
    validators: Validators[F],
    mkCell: StateChannelCell
  ): F[StateChannel[F, _, _]] =
    for {
      blockAcceptanceS <- Semaphore(1)
      blockCreationS <- Semaphore(1)
      blockStoringS <- Semaphore(1)
    } yield
      new L1[F](
        appConfig,
        blockAcceptanceS,
        blockCreationS,
        blockStoringS,
        keyPair,
        p2pClient,
        programs,
        queues,
        selfId,
        services,
        storages,
        validators,
        mkCell
      )
}


Combined Example

The easiest way to get started is to add new logic to an existing Cell. And the easiest way to do that is to make a new Cell with the logic you'd like (like EmptyCellObj below) and add it on to a pre-baked Cell like L0 using the Cell Monoid. A monoid is a fancy + operation which combines the logic of two cells into one. You can then instantiate and run them within a Main object which extends Portal (trait contiaining everything needed for setup) by invoking them with val cellProgram. In this specific example, two Cells are defined within CoCells and run, being added together into a list.

CoCells are special in that they are the context wtihin the runtime lives, or rather from which Cells are created to process various stages in consensus. When writing your own custom blockchain, special configuration and additional logic can be applied and combined across CoCells, using the logic/Cells from multiple CoCells in the implementation of a blockchain. This modularity extends beyond CoCells to any L1 which can be intergrated into CoCells for cross chain swaps and cross-network dApps.

/** CoCells are Cell factories, on input they build up cell monads which are later torn down. This oneCalls a Cell, adds it to another Cell,
* without type checking. This is good for just adding functionality to already built Cells.
*/
case class CombinedL0() extends CoCell {
def mkCell[F[_]: Async](
 l1OutputQueue: Queue[F, Signed[DAGBlock]],
 stateChannelOutputQueue: Queue[F, StateChannelOutput]
): Ω => Cell[F, StackF, Ω, Ω, Either[CellError, Ω]] = data => {
 val left: Ω => Cell[F, StackF, Ω, Ω, Either[CellError, Ω]] = L0Cell.mkCell(l1OutputQueue, stateChannelOutputQueue)
 val right: Ω => Cell[F, StackF, Ω, Ω, Either[CellError, Ω]] = EmptyCellObj.mkCell
 Cell.cellMonoid[F, StackF].combine(left(data), right(data))
}
override def argsToStartUp(args: List[String]) = StartUp(args)
}

/** Calls a Statechannel, adds it to another Statechannel. Statechannels are an fs2 (stream) api which create and object, within which you
* can build a pipeline around your cell
*/
case class CombinedStateChannel() extends CoCell {
val stateChannel: MkStateChannel = NetStateChannel
override def mkResources[A <: CliMethod]: (A, SDK[IO]) => Resource[IO, HttpApi[IO]] =
 L1HttpApi.mkResources(_: A, _: SDK[IO])
def mkCell[F[_]: Async: SecurityProvider: Random](
 ctx: BlockConsensusContext[F]
): Ω => Cell[F, StackF, Ω, Ω, Either[CellError, Ω]] = BlockConsensusCell.mkCell[F](ctx)
override def setup(args: List[String]) =
 setupCombined[org.reality.dag.l1.cli.Run](
   stateChannels,
   this,
   argsToStartUp(args).l1method,
   org.reality.dag.l1.cli.method.opts,
   L1HttpApi.mkResources(_: org.reality.dag.l1.cli.Run, _: SDK[IO])
 )
}

class CombinedMonoidExample extends Portal {
import CoCell._
println("correctMain")
val combinedL0 = CombinedL0()
val combinedStateChannel = CombinedStateChannel()
val mergedCells = combinedL0 ++ combinedStateChannel

/** You have to put this here, it is the thing that goes to the runtime
 */
val cellProgram = (args: List[String]) =>
 for {
   cellInstances <- mergedCells.map(_.setup(args)).sequence
 } yield cellInstances

}

Compose Example

Next we have the compose operator <~> with is another monoid on CoCells, but which composes their attributes and returning them within a single CoCell. This is actually really an Arrow, or rather an instance of the category of CoCells which can be used to flatten chains of them into a single CoCell.

This is handy for keeping code clean as you can reduce your entire blockchain List[CoCell] into a single CoCell, or first-class object. With this, we can pass around your blockchain as a first class object, pipe data through it, hold or return the result within an IO. Also this is handy for testing the actual runtime setup of your CoCells as it reduces all setups into one process.

trait CompositeCell extends CoCell {}

class ComposeExamples extends Portal {
import CoCell.toCoHom
val netL0 = new NetL0 {}
val combinedStateChannel = new NetStateChannel {}

/** In Combine we loaded the CoCells into runtime. Here, we can do more stuff to create "first-class objects" out of computation using an
  * Arrow (>>>) which chains them and returns an object which can handle input and be passed around. It also does some extra type checking
  * which we will use in making proofs.
  */
val computationObject: Ω => CoHomF[Ω] = new CompositeCell {}.ohmToCoHom >>> netL0.ohmToCoHom >>> combinedStateChannel.ohmToCoHom

val test1: CoHomF[Ω] => CoHomF[Ω] = new CompositeCell {}.mixHomCoHom[Ω] >>> netL0.mixHomCoHom[Ω] >>> combinedStateChannel.mixHomCoHom[Ω]

val tryChain: CoHomF[Ω] = new CompositeCell {} <~> netL0 <~> combinedStateChannel
val tryChain2 = new CompositeCell {} <~> netL0 <~> combinedStateChannel

val composeChain: CoHomF[Ω] => CoHomF[Ω] = tryChain.mixHomCoHom[Ω] >>> tryChain2.mixHomCoHom[Ω]

def compile(terminal: CoHomF[Ω], chain: CoHomF[Ω] => CoHomF[Ω]) = chain(terminal)

/** Pass in first class objects that inherit from Ω, so scripts, streams etc.
  */
val dummyBlock: Ω = InformAboutRoundStartFailure("")

/** Here's a computation object, like a Future, which returns a result once it's finished, it can be transpiled via the transverse
  * operator in coHomToListCoCell to a CoCell and passed to runtime.
  */
val actionObject: CoHomF[Ω] = computationObject(dummyBlock)

val test = fixed2(actionObject)
val newComposed = netL0 <~> combinedStateChannel

val cellProgram = (args: List[String]) =>
  for {
    cellInstances: CoCell.Context[CoCell] <- newComposed.coCell.setup(args)
  } yield cellInstances :: Nil
}


CoHom CoMonad

The CoHom CoMonad is a companion class to CoCell from which the 'cell api', which manages interop between your consensus object and your wider program (the actuall HTTP server that gets launched and is your node). The key here is coFlatmap, which allows for the created CoCell object to be accessed, computing results on input in real time. These first class objects could, for instance, be created within a repl and custom "node" implementations can be developed/tested/deployed in real time.

Below, we have executedResult which gives a Cell instance created based on the input of dummyCell and computeWithStateChannelCell. This actually lifts into the IO of the node program allowing each CoCell to be accessed, consensus to be performed with specific input and the return an IO of the running operation on a live internal server.

package org.reality.combined.examples

import cats.effect.std.Queue
import cats.effect.{Async, IO}
import cats.implicits.toTraverseOps

import org.reality.combined._
import org.reality.dag.domain.block.DAGBlock
import org.reality.dag.l1.MkStateChannel
import org.reality.dag.l1.domain.consensus.block.AlgebraCommand.InformAboutRoundStartFailure
import org.reality.domain.cell.L0Cell
import org.reality.kernel._
import org.reality.security.signature.Signed
import org.reality.statechannel.StateChannelOutput

import org.http4s.HttpRoutes

trait NetL0 extends CoCell {

  def mkCell[F[_]: Async](
    implicit l1OutputQueue: Queue[F, Signed[DAGBlock]],
    stateChannelOutputQueue: Queue[F, StateChannelOutput]
  ): Ω => Cell[F, StackF, Ω, Ω, Either[CellError, Ω]] =
    L0Cell.mkCell[F](l1OutputQueue, stateChannelOutputQueue)
  override def argsToStartUp(args: List[String]) = StartUp(args)
}

object NetStateChannel extends MkStateChannel

trait NetStateChannel extends CoCell {
  override val stateChannels: List[MkStateChannel] = NetStateChannel :: Nil
  override def mkResources[A <: CliMethod]: (A, SDK[IO]) => Resource[IO, HttpApi[IO]] = L1HttpApi.mkResources(_: A, _: SDK[IO])
  override def getEndpoints[F[_]: Async]: List[(List[HttpRoutes[F]], List[HttpRoutes[F]])] = List((Nil, Nil))
}

class CellApiExamples extends Portal {

  val myL0CoCell = new NetL0 {}
  val myStateChannelCoCell = new NetStateChannel {}
  val cells = myL0CoCell <~> myStateChannelCoCell

  val otherAppPipeline = myL0CoCell :: myStateChannelCoCell

  val cellChain = cells <~> otherAppPipeline.reduce(_ <~> _)

  val cellProgram: List[String] => IO[Seq[CoCell.Context[CoCell]]] = (args: List[String]) =>
    for {
      cellInstances: CoCell.Context[CoCell] <- cellChain.setup(args) // cellChain.map(_.setup(args)).sequence
    } yield cellInstances :: Nil

  /*
Pass in first class objects that inherit from Ω, so scripts, streams etc.
   */
  val dummyBlock: Ω = InformAboutRoundStartFailure("")

  /** You can pass input into a Cell by coFlatmapping over it, this returns the Cell, unexecuted (run() executes)
    */
  val executedResult = myL0CoCell.coMonad.coflatMap(myL0CoCell.ohmToCoHom(dummyBlock))(_.coCell.mkCell[IO]).start(dummyBlock)

  /** You can turn your runtime into a Future of some computation, basically you can make a meta-program out of all the consensus processes
    * you want.
    */
  val computeWithStateChannelCell: IO[Option[Either[CellError, Ω]]] = cellProgram(Nil).flatMap { contexts: Seq[CoCell.Context[CoCell]] =>
    val coCellContext = CoCell
      .getCoCellByName(contexts)(MixedCoCell.getClass.getName)
      .map { c: Context[CoCell] =>
        val cellInstance: CoHomF[Ω => Cell[IO, StackF, Ω, Ω, Either[CellError, Ω]]] = c.coFlatMap(_.mkCell[IO])
        val cellProgram = cellInstance.start(c).run()
        cellProgram
      }
      .sequence
    coCellContext
  }
} 

Topos Example

A Topos is a special construct which allows for logical reasoning about stateful computation. We can use it to pass configuration between CoCells, allowing them to reason about what configuration they need to share between eachother, in order to connect to multiple clusters via multiple CoCells, but do it all implicitly (setting automatically). This is another gift of the type system in addition to compile time checks of runtime errors (see Cell section above). Other logic, such as rules for cross chain swaps(Swap) or oracalizing external data and performing actions based on pre-baked logic like stop-loss orders or only accepting transactions based on the weather (Parlay).

Parlays are named as such because they are like add-ons to a bet but instead just add ons to your consensus pipeline, which can be stacked or applied selectively according to the 'braid' rules in the method mergeCoData. Braiding is special because it enforces geometric rules i.e. making a manifold out of the computation across CoCells which constitute a blockchain. This is important because it allow for certain performace gurantees (SLAa) of various blockchains to be quantified. It's also worth mentioning that the covariance in mergeCoData handles topological/dimensional compatibility, meaning that Reality blockchains are constructed from the tensor product of a graded monoidal category, or rather a Day Convolution, which can be used to convert different representations of CoCells into the Runtime representation, like an interpreter.

Rules are similar to Parlays, but also carry information about Reality Checking (rules for proof validation, see below). They can also carry governance implementations, which would make updates to the actual Laws of your blockchain using proofs of voting rules implemented inside of Rule case classes.

Note also that all of the above equates to mergeCoData allowing you to determine when if and how to merge together CoData across CoCells to be used within a CoCell, as you can see below.



/** @param coData
  * @param coCell
  */
case class CoTopos(coData: CoData, coCell: CoCell)

/** @param run
  * @tparam A
  * @tparam B
  */
case class Topos[A, B](val run: A => (Topos[A, B], B)) extends Hom[A, B] {
  implicit val rFunctor: Functor[Hom[A, *]] = Topos.rFunctor[A]
}

object Topos {
  import CoData.monoid
  implicit val arrowInstance: Arrow[Topos] = new Arrow[Topos] {
    override def lift[A, B](f: A => B): Topos[A, B] = Topos[A, B](lift(f) -> f(_))
    override def first[A, B, C](fa: Topos[A, B]): Topos[(A, C), (B, C)] =
      Topos[(A, C), (B, C)] {
        case (a, c) =>
          val (fa2, b) = fa.run(a)
          (first(fa2), (b, c))
      }
    override def compose[A, B, C](f: Topos[B, C], g: Topos[A, B]): Topos[A, C] = {
      def morph(a: A) = {
        val (gg, b) = g.run(a)
        val (ff, c) = f.run(b)
        (compose(ff, gg), c)
      }
      Topos[A, C](morph(_))
    }
  }

  def runList[A, B](ff: Topos[A, B], as: List[A]): List[B] = as match {
    case h :: t =>
      val (ff2, b) = ff.run(h)
      b :: runList(ff2, t)
    case _ => List()
  }

  def accum[A, B](b: B)(f: (A, B) => B): Topos[A, B] = Topos[A, B] { a: A =>
    val b2 = f(a, b)
    val run = (_: A) => (this, b2)
    (accum(b2)(f), b2)
  }
  def braid: Topos[CoCell, CoTopos] = combine(mergeCoData, mergeCells) >>> Arrow[Topos].lift {
    case (x, y) => CoTopos(x, y)
  }

  def run(coCells: List[CoCell]) = runList(braid, coCells)

  def sum[Z: Monoid]: Topos[Z, Z] = accum(Monoid[Z].empty)(_ |+| _)

  def count[Z]: Topos[Z, Int] = arrowInstance.lift((_: Z) => 1) >>> sum

  // todo override def to pattern match on proper cocell to combine with
  def mergeCoData: Topos[CoCell, CoData] = accum(Monoid[CoData].empty) {
    // todo use rules to determine if they match? x.coCell.coData.rules.intersect(y.rules)
    case (x: Dex, y) =>
      println(s"(x: Dex, y: Dex) x $x y $y")
      println(s" x.coData.copy(parlays = y.parlays) ${x.coData.copy(parlays = y.parlays)}")
      x.coData//this adds nothing
//      x.coData.copy(parlays = y.parlays)
//    case (x: Dex, y: CoCell) =>
//      println(s"(x: Dex, y: CoCell) x $x y $y")
//
//      monoid.empty
//    case (x: CoCell, y: Dex) =>
//      println(s"(x: CoCell, y: Dex) x $x y $y")
//      monoid.combine(x.coData, y)
    case (x: CoCell, y) =>
      println(s"(x: CoCell, y: CoCell) x $x y $y")
      monoid.combine(x.coData, y)
//    case _ =>
//      println(s"_ monoid.empty")
//      monoid.empty
  }

  def mergeCells: Topos[CoCell, CoCell] = accum(Monoid[CoCell].empty)(coCellMonoid.combine)

  def combine[F[_, _]: Arrow, A, B, C](fab: F[A, B], fac: F[A, C]): F[A, (B, C)] =
    Arrow[F].lift((a: A) => (a, a)) >>> (fab *** fac)

  def combineImplicit[F[_, _]: Arrow, A, B, C](fab: F[A, B], fac: F[A, C]): F[A, (B, C)] = {
    val fa = implicitly[Arrow[F]]
    fa.lmap[(A, A), (B, C), A](fa.split[A, B, A, C](fab, fac))(a => (a, a))
  }

  /** similar to the combine function with the addition of running a function on the result of combine
    *
    * @param fab
    * @param fac
    * @param f
    * @tparam F
    * @tparam A
    * @tparam B
    * @tparam C
    * @tparam D
    * @return
    */
  def liftA2[F[_, _]: Arrow, A, B, C, D](fab: F[A, B], fac: F[A, C])(f: B => C => D): F[A, D] = {
    val fa = implicitly[Arrow[F]]
    combine[F, A, B, C](fab, fac).rmap { case (b, c) => f(b)(c) }
  }

  implicit def rFunctor[A]: Functor[Hom[A, *]] = new Functor[Hom[A, *]] {
    override def map[B, C](fa: Hom[A, B])(f: B => C): Hom[A, C] =
      fa match {
        case EmptyCell()           => EmptyCell()
        case TypedCoCellCons(a, b) => TypedCoCellCons(a, f(b))
        case _                     => EmptyCell()
      }
  }
}

/** make type in CoData _: HttpApi[F] => Http4sDsl[F], make sure it picks up implicit in HttpApi
  * @param async$F$0
  * @param securityProvider$F$1
  * @param nodeApi
  * @tparam F
  */
abstract class DataTransactionRoutes[F[_]: Async: SecurityProvider](implicit nodeApi: HttpApi[F]) extends AdditionalRoutes[F] {

  case class SendRecordDataTransactionParams(
    destination: Address,
    fee: TransactionFee,
    data: String
  )

  private def createRecordDataTransactionF(
    nodeApi: HttpApi[F],
    params: SendRecordDataTransactionParams,
    parent: TransactionReference = TransactionReference.empty
  ): F[Json] = {
    val nodePublicAddress = nodeApi.selfId.toAddress
    val dataBytes = Base64.getDecoder.decode(params.data)
    val amount = TransactionAmount(PosLong.MinValue)
    val fee = TransactionFee(NonNegLong.MinValue)
    val salt = Random.scalaUtilRandom.map(_.nextLong.map(TransactionSalt.apply))
    val recordDataTransaction: F[RecordDataTransaction] =
      nodePublicAddress.flatMap(npa => salt.flatMap(s => s.map(RecordDataTransaction(npa, params.destination, "", amount, fee, parent, _))))

    for {
      pk <- nodeApi.selfId.value.toPublicKey
      rdt <- recordDataTransaction
      signedTx: Signed[RecordDataTransaction] <- Signed.forAsyncJson(rdt, new KeyPair(pk, nodeApi.privateKey))
      hashedTx: Hashed[RecordDataTransaction] <- signedTx.toHashed
    } yield
      Json.obj(
        "status" -> Json.fromString("Transaction Sent"),
        "transactionHash" -> Json.fromString(hashedTx.hash.toString)
      )

  }

  def handleActionF(
    nodeApi: HttpApi[F],
    action: String,
    params: Json
  ): F[Json] =
    action match {
      case "sendRecordDataTransaction" =>
        params.as[SendRecordDataTransactionParams] match {
          case Right(validParams) => createRecordDataTransactionF(nodeApi, validParams)
          case _                  => Json.obj().pure[F]
        }
      case _ => Json.obj().pure[F]

    }
  private def handleRequest(nodeApi: HttpApi[F], request: Request[F]): F[Either[DecodingFailure, F[Json]]] =
    for {
      json <- request.as[Json]
      action: Result[String] <- json.hcursor.get[String]("action").pure[F]
      payload: Result[Json] <- json.hcursor.get[Json]("payload").pure[F]
      apiResponse: Either[DecodingFailure, F[Json]] <- action.flatMap(str => payload.map(handleActionF(nodeApi, str, _))).pure[F]
    } yield apiResponse

  private val routes: HttpRoutes[F] = HttpRoutes.of[F] {
    case req @ POST -> Root / "api" =>
      handleRequest(nodeApi, req).flatMap {
        case Right(result)      => Ok(result)
        case Left(errorMessage) => BadRequest(Json.obj("error" -> Json.fromString(errorMessage.toString())))
      }
  }

  val publicRoutes: HttpRoutes[F] = routes
  val p2pRoutes: HttpRoutes[F] = HttpRoutes.empty[F]

}

object DataTransactionRoutes extends MakeAdditionalRoutes {

  def mkRoutes[F[_]: Async: SecurityProvider](implicit nodeApi: HttpApi[F]): AdditionalRoutes[F] = new DataTransactionRoutes {}
}


case class MixedCoCell() extends Dex {
  override val name = this.getClass.getName

  def stopLoss(o: Ω): Boolean = true

  override val coData = CoData( // todo make Parlay, Rule objects with mk methods? then get implicits when called from scope of callsite?
    Parlay :: Nil, // Parlay[EnqueueDAGL1Data]((o: EnqueueDAGL1Data) => o) :: Nil,
    DataTransactionRoutes :: Nil,
    Rule :: Nil // new Rule[Ω] { val fcn = (o: Ω) => if (stopLoss(o)) o else new org.reality.kernel.Empty {} } :: Nil
  ) // AdditionalRoutes

  def mkCell[F[_]: Async](
    l1OutputQueue: Queue[F, Signed[DAGBlock]],
    stateChannelOutputQueue: Queue[F, StateChannelOutput]
  ): Ω => Cell[F, StackF, Ω, Ω, Either[CellError, Ω]] = data => {
    val left = L0Cell.mkCell(l1OutputQueue, stateChannelOutputQueue)
    val right = SwapCell.mkCell
    Cell.cellMonoid[F, StackF].combine(left(data), right(data))
  }
}

case class MixedChannelCoCell() extends CoCell {
  def mkCell[F[_]: Async: SecurityProvider: cats.effect.std.Random](
    ctx: BlockConsensusContext[F]
  ): Ω => Cell[F, StackF, Ω, Ω, Either[CellError, Ω]] = data => {
    val test: Ω => Cell[F, StackF, Ω, Ω, Either[CellError, Ω]] = BlockConsensusCell.mkCell(ctx)
    val other: Ω => Cell[F, StackF, Ω, Ω, Either[CellError, Ω]] = EthStateChannel.mkCell
    Cell.cellMonoid[F, StackF].combine(test(data), other(data))
  }
}

case class Tokenomics(tokenName: String, fee: Long, maxValue: Long)

case class SwapPair(tokenTo: Tokenomics, tokenFrom: Tokenomics)

trait Rule[O <: Ω] {
  val fcn: O => O
}

abstract class RuleF[F[_]: Async, O <: Ω] {
  val fcn: O => F[Option[O]]
}

trait MkRule {
  def mkRule[F[_]: Async, O <: Ω](default: O)(implicit storages: L0Storages[F]): RuleF[F, O]

  val stopLossPrice = 100L

  // todo get working with F so we can pass in with mkCell during endpoint or statechannel creation
  def stopLoss[F[_]: Concurrent](o: Ω)(implicit storages: L0Storages[F]): F[Boolean] =
    storages.globalSnapshot.getLatestBalances.map(
      _.get.forall(_._2.value > stopLossPrice)
    ) // todo use internal resource to map over mempool
}

/** Pass in pure function which contains extra logic to add between CoCells, essentially we can oracalize data to make decisions within the
  * pure function
  */
case class Parlay[O <: Ω](fcn: O => O)
    extends Rule[O] //todo carry from the left, pass extra parlay logic to filter l1 consensus based on the logic

object Parlay extends MkRule {
  def mkRule[F[_]: Async, O <: Ω](default: O)(implicit storages: L0Storages[F]): RuleF[F, O] = new RuleF[F, O] {
    val fcn: O => F[Option[O]] = { (o: O) =>
      for {
        t: Option[O] <- stopLoss[F](o).ifM(Option(o).pure[F], Option(default).pure[F])
      } yield t
    }
  }
}

object Rule extends MkRule {
  def mkRule[F[_]: Async, O <: Ω](default: O)(implicit storages: L0Storages[F]): RuleF[F, O] = new RuleF[F, O] {
    val fcn: O => F[Option[O]] = { (o: O) =>
      for {
        t: Option[O] <- stopLoss[F](o).ifM(Option(o).pure[F], Option(default).pure[F])
      } yield t
    }
  }
}

/** Pure functions for fees,swap transfer rules, governance vote countrules.
  */
case class Swap[O <: Ω](val fcn: O => O, swapRules: List[SwapPair])
    extends Rule[O] //todo carry from the left, defines swap pairs passes to L1 and validates that it has them, or rather validates that the sum of all right has them
//todo make the dex api a pre-built interface which new tokens add to when they deploy

/** shit each cocell to the right needs to make data structures for shit on the left
  *
  * @param parlays
  * @param endpoints
  * @param rules
  */
case class CoData(parlays: List[MkRule], endpoints: List[MakeAdditionalRoutes], rules: List[MkRule]) extends Ω {}

/** If you want to actually launch a token you'll undoubdetly want some sort of validatore rewards (mining), transaction app fees, endpoints
  * or even custom logic like only making a swap if its raining in Buffalo (Parlay). All of these are "co-things" to CoCells, which means we
  * can make an API for invoking and applying them, namely we can use a Topos which is a special type of monad which allows us to pass in
  * extra data.
  */
object CoData {
  implicit val monoid = new Monoid[CoData] {
    override def empty: CoData = CoData(Nil, Nil, Nil)
    override def combine(x: CoData, y: CoData): CoData = CoData(x.parlays ++ y.parlays, x.endpoints ++ y.endpoints, x.rules ++ y.rules)
  }
}

class ToposExamples extends Portal {
  import CoCell._
  val combinedL0 = MixedCoCell()

  val combinedStateChannel = MixedChannelCoCell()

  val mergedCells = combinedL0 ++ combinedStateChannel
  val testTopos: Seq[CoTopos] = Topos.run(mergedCells) // _.coCell.newCoCellWithCoData(_.coData)
  val cellProgram = (args: List[String]) =>
    for {
      cellInstancesReduced <- testTopos.map(_.coCell).reduce(_ <~> _).setup(args) // .map(_.coCell.setup(args)).sequence

    } yield cellInstancesReduced :: Nil // ++ cellInstances

  val computeWithStateChannelCell: IO[Option[Either[CellError, Ω]]] = cellProgram(Nil).flatMap { contexts: Seq[CoCell.Context[CoCell]] =>
    val coCellContext = CoCell
      .getCoCellByName(contexts)(MixedCoCell.getClass.getName)
      .map { c: Context[CoCell] =>
        val cellInstance: CoHomF[Ω => Cell[IO, StackF, Ω, Ω, Either[CellError, Ω]]] = c.coFlatMap(_.mkCell[IO])
        val cellProgram = cellInstance.start(c).run()
        cellProgram
      }
      .sequence
    coCellContext
  }

}

Cross Chain Swaps and the Dex API (Experimental)

Now that we know how to construct our own blockchains, we can use the same primitives to monetize them. One novel feature Reality provides is that ability to swap data (tokens) across independent L1s. This allows you to essentially list your own pairing with your blockchain's token and an external token; your blockchain hosts an open order book where swap orders of buy and sell from both sides are matched (this happens within the mempool cache) essentially ICOing your token without having to go through the hoops of listing on exchanges, funding liqudity pools, etc. The Reality ledger is the ultimate source of truth and so part of the fees must be paid in $NET or in most cases also with your blockchian's token as well. These fees are added together using Topos, as well as other Parlays and Rules .

Also this same orderbook can be used to pay for dApps, or a special type of dApp, namely a rApp (Reality dApp), made out of a turing complete executable. Unlike dApps which can only prove rudimentary state update via non-turnig complete smart contract op-codes, rApps operate by generating "Reality Checks". The executable portion of Reality Checks implicitly generate a new type of proof called an execution hash, which generates a dag of hashes state updates of literal program state and can be iterated over and validated to prove the execution of the executable. Reality checks can also integrate with other proofs like zkSnarks, rollups etc. to create and integrated state proof for your particular use case.

The goal here is


abstract class WasmError extends Exception {
  def message: String
}

case class FunctionNotFound(functionName: String) extends WasmError {
  def message: String = s"Function $functionName not found."
}

case class ExecutionError(details: String) extends WasmError {
  def message: String = s"Execution error: $details."
}

case object WasmExecutorUnavailable extends WasmError {
  def message: String = "WasmExecutor instance is not available."
}

class WasmExecutor(input: Ω) extends AutoCloseable {
  private val store: Store[Void] = Store.withoutData()
  private val engine: Engine = store.engine()
  private var module: Option[Module] = None
  private var instance: Option[Instance] = None

  // Setup the WASM module with the given binary
  def setup(wasmBinary: Array[Byte]): Unit = {
    // Clean up previous instance and module if they exist
    instance.foreach(_.close())
    module.foreach(_.close())

    module = Some(Module.fromBinary(engine, wasmBinary))
    instance = Some(new Instance(store, module.get, util.Collections.emptyList()))
  }

  def executeFunction[T, R](name: String, params: T)(
    implicit paramsConverter: T => Val,
    resultConverter: Array[Val] => R
  ): Either[WasmError, R] =
    instance match {
      case Some(inst) =>
        val funcOpt: Optional[Func] = inst.getFunc(store, name)
        Right(resultConverter(funcOpt.get().call(store)))
      case None =>
        Left(WasmExecutorUnavailable)
    }

  /** todo, return Ω => Ω out of executeFunction()
    * @return
    */
  def exeFcn = input

  // Clean up resources
  def close(): Unit = {
    instance.foreach(_.close())
    module.foreach(_.close())
    store.close()
    engine.close()
  }
}

/** make type in CoData _: HttpApi[F] => Http4sDsl[F], make sure it picks up implicit in HttpApi
  * @param async$F$0
  * @param securityProvider$F$1
  * @param nodeApi
  * @tparam F
  */
abstract class DexRoutes[F[_]: Async: SecurityProvider](implicit nodeApi: HttpApi[F]) extends AdditionalRoutes[F] {

  case class SendRecordDataTransactionParams(
    destination: Address,
    fee: TransactionFee,
    data: String
  )

  private def makeSwapTransaction(swapTx: SwapTx, nodeApi: HttpApi[F]): F[Json] = {
    val obj = Json.obj("status" -> Json.fromString("Transaction Sent"), "transactionHash" -> Json.fromString(swapTx.hash.toString))
    for {
      _ <- nodeApi.httpApi.cliApp.run(
        Request[F](method = POST, uri = Uri.unsafeFromString(s"http://localhost:9000/input/$obj"))
      ) // todo enqueue txs
    } yield obj
  }

  def handleActionF(
    nodeApi: HttpApi[F],
    action: String,
    params: Json
  ): F[Json] =
    action match {
      case "swapTxs" =>
        params.as[SwapTx] match {
          case Right(validParams) => makeSwapTransaction(validParams, nodeApi)
          case _                  => Json.obj().pure[F]
          //          case Left(error)        => new Exception(s"Invalid params: $error").pure[F]
        }
      case _ => Json.obj().pure[F]

    }
  private def handleRequest(nodeApi: HttpApi[F], request: Request[F]): F[Either[DecodingFailure, F[Json]]] =
    for {
      json <- request.as[Json]
      action: Result[String] <- json.hcursor.get[String]("action").pure[F]
      payload: Result[Json] <- json.hcursor.get[Json]("payload").pure[F]
      apiResponse: Either[DecodingFailure, F[Json]] <- action.flatMap(str => payload.map(handleActionF(nodeApi, str, _))).pure[F]
    } yield apiResponse

  private val routes: HttpRoutes[F] = HttpRoutes.of[F] {
    case req @ POST -> Root / "api" =>
      handleRequest(nodeApi, req).flatMap {
        case Right(result)      => Ok(result)
        case Left(errorMessage) => BadRequest(Json.obj("error" -> Json.fromString(errorMessage.toString())))
      }
  }

  val publicRoutes: HttpRoutes[F] = routes
  val p2pRoutes: HttpRoutes[F] = HttpRoutes.empty[F]

}

object DexRoutes extends MakeAdditionalRoutes {

  def mkRoutes[F[_]: Async: SecurityProvider](implicit nodeApi: HttpApi[F]): AdditionalRoutes[F] = new DataTransactionRoutes {}
}

/** todo fee example where tx validated on subledger to reduce fee on net Lomega ledger and total fees overall
  */
trait Dex extends CoCell {
  val executable = new WasmExecutor(_)
  val netFee = TransactionFee
  val netSwapFee = 1L
  def validateFee(o: Ω): Boolean = true
  def swapFeeTotal(o: Ω) = if (validateFee(o)) o else new Hom[Nothing, Nothing] {}
  override val coData: CoData = CoData(
    Parlay(executable(_).exeFcn) :: Nil, // todo put fee logic in exe?
    DexRoutes :: Nil,
    Swap(swapFeeTotal(_), List(SwapPair(Tokenomics("$net", 1L, 1000000000L), Tokenomics("$eth", 1L, 120024000L)))) :: Nil
  )

}

case class EthSwapDex() extends Dex {

  override val name = this.getClass.getName

  override val coData = CoData(Nil, DataTransactionRoutes :: Nil, Nil) // AdditionalRoutes
  def mkCell[F[_]: Async](
    l1OutputQueue: Queue[F, Signed[DAGBlock]],
    stateChannelOutputQueue: Queue[F, StateChannelOutput]
  ): Ω => Cell[F, StackF, Ω, Ω, Either[CellError, Ω]] = data => {
    val left = L0Cell.mkCell(l1OutputQueue, stateChannelOutputQueue)
    val right = SwapCell.mkCell
    Cell.cellMonoid[F, StackF].combine(left(data), right(data))
  }
}

case class SwapBlock(
  parent: NonEmptyList[BlockReference],
  transactions: NonEmptySet[Transaction]
) extends Ω

class EthCellObj(val ethSwaps: Seq[SwapTx]) extends StateChannelCell {
  val parent1 = BlockReference(Height(4L), ProofsHash("parent1"))
  val parent2 = BlockReference(Height(5L), ProofsHash("parent2"))
  val swapTx = SwapTx(
    Address("DAG8Yy2enxizZdWoipKKZg6VXwk7rY2Z54mJqUdC"),
    Address(
      "DAG8Yy2enxizZdWoipKKZg6VXwk7rY2Z54mJqUdC"
    ), // todo need wrapper or parent Address trait, want EthAddress here (Address checks validity at instantiation)
    TransactionAmount(PosLong(1L)),
    TransactionAmount(PosLong(1L)),
    TransactionAmount(PosLong(1L)),
    TransactionFee(PosLong(1L)),
    TransactionReference.empty,
    TransactionSalt(0L)
  )
  val ethSideTxHashes = List("")

  def mkCell[F[_]: Async: SecurityProvider: Random](
    ctx: BlockConsensusContext[F]
  ): Ω => Cell[F, StackF, Ω, Ω, Either[CellError, Ω]] = data =>
    new Cell[F, StackF, Ω, Ω, Either[CellError, Ω]](
      data,
      scheme.hyloM(
        AlgebraM[F, StackF, Either[CellError, Ω]] {
          case More(a) => a.pure[F]
          case Done(Right(cmd: AlgebraCommand)) =>
            cmd match {
              case ProcessSwaps(s, o) =>
                val res = SwapBlock(NonEmptyList.one(parent2), NonEmptySet.fromSetUnsafe(SortedSet.from(s))).pure[F]
                res.map(_.asRight[CellError].widen[Ω])

              case InformAboutRoundStartFailure(message) =>
                val res: F[Either[CellError, Ω]] = CellError(message).asLeft[Ω].pure[F]
                res
              case _ => NullTerminal.asRight[CellError].widen[Ω].pure[F]
            }
          case Done(other) => other.pure[F]
        },
        CoalgebraM[F, StackF, Ω] {
          case EnqueueETHBlock(data: OmegaEthBlockWrapper) => 
            if (ethSideTxHashes.exists(data.ethBlock.getBlock.getTransactions.contains(_)))
              Applicative[F].pure(Done(InformAboutRoundStartFailure("").asRight[CellError]))
            else Applicative[F].pure(Done(ProcessSwaps(ethSwaps.toList, data :: Nil).asRight[CellError]))
          case EnqueueETHSwaps(swaps: SwapWrapper) => // todo add lookups in if (ctx.transactionStorage.waitingTransactions)
            if (ethSideTxHashes.exists(txHash => swaps.swaps.exists(_.parent.hash.value == txHash)))
              Applicative[F].pure(Done(InformAboutRoundStartFailure("").asRight[CellError]))
            else Applicative[F].pure(Done(ProcessSwaps(swaps.swaps, Nil).asRight[CellError]))

          //          case ProcessStateChannelSnapshot(snapshot) => Coalgebra.processStateChannelSnapshot(snapshot)
          case _ => Applicative[F].pure(Done(org.reality.dag.l1.domain.consensus.block.AlgebraCommand.NoAction.asRight[CellError]))
        }
      ),
      {
        case d: OmegaEthBlockWrapper =>
          println(s"EthCellObj OmegaEthBlockWrapper $d")
          EnqueueETHBlock(d)
        case swaps: SwapWrapper =>
          println(s"EthCellObj SwapWrapper $swaps")

          EnqueueETHSwaps(swaps)
        case a @ _ =>
          println(s"EthCellObj a ${a}")
          org.reality.dag.l1.domain.consensus.block.CoalgebraCommand.Empty()
      }
    )
}
class MkEthStateChannel(val swapTxs: List[SwapTx]) extends MkStateChannel {
  println("creating EthStateChannel")

  val cellObj: StateChannelCell = new EthCellObj(swapTxs)
  def make[F[_]: Async: SecurityProvider: Random](
    appConfig: AppConfig,
    keyPair: KeyPair,
    p2pClient: L1P2PClient[F],
    programs: L1Programs[F],
    queues: L1Queues[F],
    selfId: PeerId,
    services: L1Services[F],
    storages: L1Storages[F],
    validators: Validators[F],
    mkCell: StateChannelCell
  ): F[StateChannel[F, _, _]] =
    for {
      blockAcceptanceS <- Semaphore(1)
      blockCreationS <- Semaphore(1)
      blockStoringS <- Semaphore(1)
    } yield
      new EthStateChannel[F](
        appConfig,
        blockAcceptanceS,
        blockCreationS,
        blockStoringS,
        keyPair,
        p2pClient,
        programs,
        queues,
        selfId,
        services,
        storages,
        validators,
        mkCell
      )

  implicit val traverse: Traverse[EthCoHom] = new DefaultTraverse[EthCoHom] {
    override def traverse[G[_]: Applicative, A, B](fa: EthCoHom[A])(f: A => G[B]): G[EthCoHom[B]] =
      fa.asInstanceOf[EthCoHom[B]].pure[G]
  }
}

case class EthSwapStatechannelDex(val swapTxs: List[SwapTx]) extends Dex {
  val stateChannel: MkStateChannel = new MkEthStateChannel(swapTxs)
  override def mkResources[A <: CliMethod]: (A, SDK[IO]) => Resource[IO, HttpApi[IO]] =
    L1HttpApi.mkResources(_: A, _: SDK[IO])
  def mkCell[F[_]: Async: SecurityProvider: cats.effect.std.Random](
    ctx: BlockConsensusContext[F]
  ): Ω => Cell[F, StackF, Ω, Ω, Either[CellError, Ω]] = data => {
    val test: Ω => Cell[F, StackF, Ω, Ω, Either[CellError, Ω]] = BlockConsensusCell.mkCell(ctx)
    val other: Ω => Cell[F, StackF, Ω, Ω, Either[CellError, Ω]] = stateChannel.mkCell[F](ctx)
    Cell.cellMonoid[F, StackF].combine(test(data), other(data))
  }
  override def setup(args: List[String]) =
    setupCombined[org.reality.dag.l1.cli.Run](
      stateChannel :: Nil,
      this,
      argsToStartUp(args).l1method,
      org.reality.dag.l1.cli.method.opts,
      L1HttpApi.mkResources(_: org.reality.dag.l1.cli.Run, _: SDK[IO])
    )


class DexAPIExample {class DexAPIExample extends Portal {

  val swapTxRight = SwapTx(
    Address("DAG8Yy2enxizZdWoipKKZg6VXwk7rY2Z54mJqUdC"),
    Address(
      "DAG8Yy2enxizZdWoipKKZg6VXwk7rY2Z54mJqUdC"
    ),
    TransactionAmount(PosLong(1L)),
    TransactionAmount(PosLong(1L)),
    TransactionAmount(PosLong(1L)),
    TransactionFee(PosLong(1L)),
    TransactionReference.empty,
    TransactionSalt(0L)
  )

  val swapTxLeft = SwapTx(
    Address("DAG8Yy2enxizZdWoipKKZg6VXwk7rY2Z54mJqUdC"),
    Address(
      "DAG8Yy2enxizZdWoipKKZg6VXwk7rY2Z54mJqUdC" 
    ),
    TransactionAmount(PosLong(1L)),
    TransactionAmount(PosLong(1L)),
    TransactionAmount(PosLong(1L)),
    TransactionFee(PosLong(1L)),
    TransactionReference.empty,
    TransactionSalt(0L)
  )
  /** //this step happens on ethereum, using web3j, get block with eth-validated result then pass below. Internally, when the block is
    * accepted into a snapshot, the Net funds will be transferred upon snapshot acceptance.
    */
//  val transactionReceipt =
//    Transfer.sendFunds(web3, credentials, "0x<address>|<ensName>", java.math.BigDecimal.valueOf(1.0), Convert.Unit.ETHER).send();
  val ethSwapDemo = CombinedL0() ++ EthSwapStatechannelDex(swapTxRight :: swapTxRight :: Nil)

  val cellProgram = (args: List[String]) =>
    for {
      cellInstancesReduced <- ethSwapDemo.map(_.setup(args)).sequence
    } yield cellInstancesReduced

}


Towards a higher level language (Experimental)

As all of the examples above construct a program which is then interpreted into a runtime, we can consider the cell-api as something like a "distributed interpreter", which interprets state data across networks monadically giving introspection on a scope (programmatic, like accessing objects and members) which contains said state data.

Well there is one more reason why Cells are constructed using recursion schemes: a higher level language, or rather the AST of one, can be compiled down to the same runtime of List[CoCell] using the transverse operator (.trans() in Droste) which takes the nested representation (AST) and converts it to a flat runtime.

The core of this functionality lives in MutuallyRecursive.scala, specifically in transHomToList(), which actually takes in a nested TypedCoCell (AST node) object and recursively flattens into List[CoCell]. An example lives in BabelExample.scala, which takes a chain of cocells (CoHomF => CoHomF) and returns one CoHomF which is the start of the AST. The CoHomF can then be converted to List[CoCell] via CoCell.coHomToListCoCell[Ω] and added to the runtime.



class BabelExample extends Portal {
  import org.reality.combined.CoCell._
  println("correctMain")
  val netL0 = new NetL0 {}
  val combinedStateChannel = new NetStateChannel {}

  val computationObject: Ω => CoHomF[Ω] = new CompositeCell {}.ohmToCoHom >>> netL0.ohmToCoHom >>> combinedStateChannel.ohmToCoHom

  val test1: CoHomF[Ω] => CoHomF[Ω] = netL0.mixHomCoHom[Ω] >>> combinedStateChannel.mixHomCoHom[Ω]

  val tryChain: CoHomF[Ω] = new CompositeCell {} <~> netL0 <~> combinedStateChannel
  val tryChain2 = new CompositeCell {} <~> netL0 <~> combinedStateChannel

  val composeChain: CoHomF[Ω] => CoHomF[Ω] = tryChain.mixHomCoHom[Ω] >>> tryChain2.mixHomCoHom[Ω]

  def compile(terminal: CoHomF[Ω], chain: CoHomF[Ω] => CoHomF[Ω]) = chain(terminal)

  /** CoProduct for Portal.fixed
    */
  val cellProgram = (args: List[String]) =>
    for {
      cellInstances <- CoCell.coHomToListCoCell[Ω](compile(new CompositeCell {}, test1)).map(_.setup(args)).sequence
    } yield cellInstances

}

rApp deployment, launching your token (deprecated)

This covers the basic flow to test uploading a binary to create a state channel and then sending some input to it.

# root directory

sbt compile
sbt publishLocal
sbt publishM2

# examples/empty

sbt package

This will create a jar file in examples/empty/target/scala-2.13/empty_2.13-0.1.0-SNAPSHOT.jar

To upload this and create a state channel, run the following:


# examples/empty/target/scala-2.13/

curl -X POST localhost:8080/state-channel-jar --data-binary @empty_2.13-0.1.0-SNAPSHOT.jar

To create test input for the state channel, run the following file from IntelliJ:

src/test/scala/org/tessellation/aci/GeneratePayloadTest.scala

This will create a file in /tmp/state-channel-input

To send this input to the state channel, run

# root directory
curl -X POST localhost:8080/input/DAG3k3VihUWMjse9LE93jRqZLEuwGd6a5Ypk4zYS --data-binary @/tmp/state-channel-input

Was this page helpful?